From 1819b8c90b8440d6d896f4315380651f1880989c Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Tue, 9 Apr 2024 11:58:06 -0400 Subject: [PATCH] feat(api): Pause when `pick_up_tip()` errors in a Python protocol (#14753) --- .../protocol_api/core/engine/instrument.py | 7 +- .../protocol_engine/actions/__init__.py | 2 + .../protocol_engine/actions/actions.py | 21 + .../protocol_engine/clients/sync_client.py | 23 + .../protocol_engine/clients/transports.py | 115 ++++- .../execution/command_executor.py | 1 + .../protocol_engine/execution/queue_worker.py | 3 + .../protocol_engine/protocol_engine.py | 81 +++- .../protocol_engine/state/commands.py | 10 + .../opentrons/protocol_engine/state/state.py | 55 ++- .../opentrons/protocol_engine/state/tips.py | 32 +- .../protocol_runner/legacy_command_mapper.py | 1 + .../core/engine/test_instrument_core.py | 2 +- .../execution/test_command_executor.py | 1 + .../state/test_command_state.py | 361 ++++++++++++++- .../state/test_command_store_old.py | 425 +----------------- .../state/test_command_view_old.py | 2 + .../protocol_engine/state/test_state_store.py | 42 +- .../protocol_engine/test_protocol_engine.py | 102 +++++ .../test_legacy_command_mapper.py | 2 + 20 files changed, 810 insertions(+), 478 deletions(-) diff --git a/api/src/opentrons/protocol_api/core/engine/instrument.py b/api/src/opentrons/protocol_api/core/engine/instrument.py index 9c88a4f7ecb..485f45d0e94 100644 --- a/api/src/opentrons/protocol_api/core/engine/instrument.py +++ b/api/src/opentrons/protocol_api/core/engine/instrument.py @@ -408,13 +408,18 @@ def pick_up_tip( well_name=well_name, well_location=well_location, ) - self._engine_client.pick_up_tip( + + self._engine_client.pick_up_tip_wait_for_recovery( pipette_id=self._pipette_id, labware_id=labware_id, well_name=well_name, well_location=well_location, ) + # Set the "last location" unconditionally, even if the command failed + # and was recovered from and we don't know if the pipette is physically here. + # This isn't used for path planning, but rather for implicit destination + # selection like in `pipette.aspirate(location=None)`. self._protocol_core.set_last_location(location=location, mount=self.get_mount()) def drop_tip( diff --git a/api/src/opentrons/protocol_engine/actions/__init__.py b/api/src/opentrons/protocol_engine/actions/__init__.py index b1181e6a50e..ac3fc653976 100644 --- a/api/src/opentrons/protocol_engine/actions/__init__.py +++ b/api/src/opentrons/protocol_engine/actions/__init__.py @@ -11,6 +11,7 @@ PauseAction, PauseSource, StopAction, + ResumeFromRecoveryAction, FinishAction, HardwareStoppedAction, QueueCommandAction, @@ -38,6 +39,7 @@ "PlayAction", "PauseAction", "StopAction", + "ResumeFromRecoveryAction", "FinishAction", "HardwareStoppedAction", "QueueCommandAction", diff --git a/api/src/opentrons/protocol_engine/actions/actions.py b/api/src/opentrons/protocol_engine/actions/actions.py index d5c6bb49abc..ee36e76f7de 100644 --- a/api/src/opentrons/protocol_engine/actions/actions.py +++ b/api/src/opentrons/protocol_engine/actions/actions.py @@ -154,11 +154,32 @@ class FailCommandAction: """ command_id: str + """The command to fail.""" + error_id: str + """An ID to assign to the command's error. + + Must be unique to this occurrence of the error. + """ + failed_at: datetime + """When the command failed.""" + error: EnumeratedError + """The underlying exception that caused this command to fail.""" + notes: List[CommandNote] + """Overwrite the command's `.notes` with these.""" + type: ErrorRecoveryType + """How this error should be handled in the context of the overall run.""" + + # This is a quick hack so FailCommandAction handlers can get the params of the + # command that failed. We probably want this to be a new "failure details" + # object instead, similar to how succeeded commands can send a "private result" + # to Protocol Engine internals. + running_command: Command + """The command to fail, in its prior `running` state.""" @dataclass(frozen=True) diff --git a/api/src/opentrons/protocol_engine/clients/sync_client.py b/api/src/opentrons/protocol_engine/clients/sync_client.py index f9c9e2ee6c6..f95611c1b4c 100644 --- a/api/src/opentrons/protocol_engine/clients/sync_client.py +++ b/api/src/opentrons/protocol_engine/clients/sync_client.py @@ -296,6 +296,29 @@ def pick_up_tip( return cast(commands.PickUpTipResult, result) + def pick_up_tip_wait_for_recovery( + self, + pipette_id: str, + labware_id: str, + well_name: str, + well_location: WellLocation, + ) -> commands.PickUpTip: + """Execute a PickUpTip, wait for any error recovery, and return it. + + Note that the returned command will not necessarily have a `result`. + """ + request = commands.PickUpTipCreate( + params=commands.PickUpTipParams( + pipetteId=pipette_id, + labwareId=labware_id, + wellName=well_name, + wellLocation=well_location, + ) + ) + command = self._transport.execute_command_wait_for_recovery(request=request) + + return cast(commands.PickUpTip, command) + def drop_tip( self, pipette_id: str, diff --git a/api/src/opentrons/protocol_engine/clients/transports.py b/api/src/opentrons/protocol_engine/clients/transports.py index 270599ff469..6de08db97ed 100644 --- a/api/src/opentrons/protocol_engine/clients/transports.py +++ b/api/src/opentrons/protocol_engine/clients/transports.py @@ -1,15 +1,28 @@ """A helper for controlling a `ProtocolEngine` without async/await.""" from asyncio import AbstractEventLoop, run_coroutine_threadsafe -from typing import Any, overload +from typing import Any, Final, overload from typing_extensions import Literal from opentrons_shared_data.labware.dev_types import LabwareUri from opentrons_shared_data.labware.labware_definition import LabwareDefinition + from ..protocol_engine import ProtocolEngine from ..errors import ProtocolCommandFailedError +from ..error_recovery_policy import ErrorRecoveryType from ..state import StateView -from ..commands import CommandCreate, CommandResult +from ..commands import Command, CommandCreate, CommandResult, CommandStatus + + +class RunStoppedBeforeCommandError(RuntimeError): + """Raised if the ProtocolEngine was stopped before a command could start.""" + + def __init__(self, command: Command) -> None: + self._command = command + super().__init__( + f"The run was stopped" + f" before {command.commandType} command {command.id} could execute." + ) class ChildThreadTransport: @@ -30,8 +43,10 @@ def __init__(self, engine: ProtocolEngine, loop: AbstractEventLoop) -> None: want to synchronously access it. loop: The event loop that `engine` is running in (in the other thread). """ - self._engine = engine - self._loop = loop + # We might access these from different threads, + # so let's make them Final for (shallow) immutability. + self._engine: Final = engine + self._loop: Final = loop @property def state(self) -> StateView: @@ -39,7 +54,11 @@ def state(self) -> StateView: return self._engine.state_view def execute_command(self, request: CommandCreate) -> CommandResult: - """Execute a ProtocolEngine command, blocking until the command completes. + """Execute a ProtocolEngine command. + + This blocks until the command completes. If the command fails, this will always + raise the failure as an exception--even if ProtocolEngine deemed the failure + recoverable. Args: request: The ProtocolEngine command request @@ -48,8 +67,11 @@ def execute_command(self, request: CommandCreate) -> CommandResult: The command's result data. Raises: - ProtocolEngineError: if the command execution is not successful, - the specific error that cause the command to fail is raised. + ProtocolEngineError: If the command execution was not successful, + the specific error that caused the command to fail is raised. + + If the run was stopped before the command could complete, that's + also signaled as this exception. """ command = run_coroutine_threadsafe( self._engine.add_and_execute_command(request=request), @@ -64,21 +86,76 @@ def execute_command(self, request: CommandCreate) -> CommandResult: message=f"{error.errorType}: {error.detail}", ) - # FIXME(mm, 2023-04-10): This assert can easily trigger from this sequence: - # - # 1. The engine is paused. - # 2. The user's Python script calls this method to start a new command, - # which remains `queued` because of the pause. - # 3. The engine is stopped. - # - # The returned command will be `queued`, so it won't have a result. - # - # We need to figure out a proper way to report this condition to callers - # so they correctly interpret it as an intentional stop, not an internal error. - assert command.result is not None, f"Expected Command {command} to have result" + if command.result is None: + # This can happen with a certain pause timing: + # + # 1. The engine is paused. + # 2. The user's Python script calls this method to start a new command, + # which remains `queued` because of the pause. + # 3. The engine is stopped. The returned command will be `queued` + # and won't have a result. + raise RunStoppedBeforeCommandError(command) return command.result + def execute_command_wait_for_recovery(self, request: CommandCreate) -> Command: + """Execute a ProtocolEngine command, including error recovery. + + This blocks until the command completes. Additionally, if the command fails, + this will continue to block until its error recovery has been completed. + + Args: + request: The ProtocolEngine command request. + + Returns: + The command. If error recovery happened for it, the command will be + reported here as failed. + + Raises: + ProtocolEngineError: If the command failed, *and* the failure was not + recovered from. + + If the run was stopped before the command could complete, that's + also signalled as this exception. + """ + + async def run_in_pe_thread() -> Command: + command = await self._engine.add_and_execute_command_wait_for_recovery( + request=request + ) + + if command.error is not None: + error_was_recovered_from = ( + self._engine.state_view.commands.get_error_recovery_type(command.id) + == ErrorRecoveryType.WAIT_FOR_RECOVERY + ) + if not error_was_recovered_from: + error = command.error + # TODO: this needs to have an actual code + raise ProtocolCommandFailedError( + original_error=error, + message=f"{error.errorType}: {error.detail}", + ) + + elif command.status == CommandStatus.QUEUED: + # This can happen with a certain pause timing: + # + # 1. The engine is paused. + # 2. The user's Python script calls this method to start a new command, + # which remains `queued` because of the pause. + # 3. The engine is stopped. The returned command will be `queued`, + # and won't have a result. + raise RunStoppedBeforeCommandError(command) + + return command + + command = run_coroutine_threadsafe( + run_in_pe_thread(), + loop=self._loop, + ).result() + + return command + @overload def call_method( self, diff --git a/api/src/opentrons/protocol_engine/execution/command_executor.py b/api/src/opentrons/protocol_engine/execution/command_executor.py index d44d37f5641..9488d1719e9 100644 --- a/api/src/opentrons/protocol_engine/execution/command_executor.py +++ b/api/src/opentrons/protocol_engine/execution/command_executor.py @@ -167,6 +167,7 @@ async def execute(self, command_id: str) -> None: FailCommandAction( error=error, command_id=running_command.id, + running_command=running_command, error_id=self._model_utils.generate_id(), failed_at=self._model_utils.get_timestamp(), notes=note_tracker.get_notes(), diff --git a/api/src/opentrons/protocol_engine/execution/queue_worker.py b/api/src/opentrons/protocol_engine/execution/queue_worker.py index c1ba60eb143..179880c03e9 100644 --- a/api/src/opentrons/protocol_engine/execution/queue_worker.py +++ b/api/src/opentrons/protocol_engine/execution/queue_worker.py @@ -72,6 +72,9 @@ async def _run_commands(self) -> None: command_id = await self._state_store.wait_for( condition=self._state_store.commands.get_next_to_execute ) + # Assert for type hinting. This is valid because the wait_for() above + # only returns when the value is truthy. + assert command_id is not None except RunStoppedError: # There are no more commands that we should execute, either because the run has # completed on its own, or because a client requested it to stop. diff --git a/api/src/opentrons/protocol_engine/protocol_engine.py b/api/src/opentrons/protocol_engine/protocol_engine.py index 8e23c08013f..bd995f4339a 100644 --- a/api/src/opentrons/protocol_engine/protocol_engine.py +++ b/api/src/opentrons/protocol_engine/protocol_engine.py @@ -234,7 +234,10 @@ async def add_and_execute_command( the command in state. Returns: - The command. If the command completed, it will be succeeded or failed. + The command. + + If the command completed, it will be succeeded or failed. + If the engine was stopped before it reached the command, the command will be queued. """ @@ -242,6 +245,34 @@ async def add_and_execute_command( await self.wait_for_command(command.id) return self._state_store.commands.get(command.id) + async def add_and_execute_command_wait_for_recovery( + self, request: commands.CommandCreate + ) -> commands.Command: + """Like `add_and_execute_command()`, except wait for error recovery. + + Unlike `add_and_execute_command()`, if the command fails, this will not + immediately return the failed command. Instead, if the error is recoverable, + it will wait until error recovery has completed (e.g. when some other task + calls `self.resume_from_recovery()`). + + Returns: + The command. + + If the command completed, it will be succeeded or failed. If it failed + and then its failure was recovered from, it will still be failed. + + If the engine was stopped before it reached the command, + the command will be queued. + """ + queued_command = self.add_command(request) + await self.wait_for_command(command_id=queued_command.id) + completed_command = self._state_store.commands.get(queued_command.id) + await self._state_store.wait_for_not( + self.state_view.commands.get_recovery_in_progress_for_command, + queued_command.id, + ) + return completed_command + def estop( self, # TODO(mm, 2024-03-26): Maintenance runs are a robot-server concept that @@ -251,6 +282,15 @@ def estop( ) -> None: """Signal to the engine that an estop event occurred. + If an estop happens while the robot is moving, lower layers physically stop + motion and raise the event as an exception, which fails the Protocol Engine + command. No action from the `ProtocolEngine` caller is needed to handle that. + + However, if an estop happens in between commands, or in the middle of + a command like `comment` or `waitForDuration` that doesn't access the hardware, + `ProtocolEngine` needs to be told about it so it can treat it as a fatal run + error and stop executing more commands. This method is how to do that. + If there are any queued commands for the engine, they will be marked as failed due to the estop event. If there aren't any queued commands *and* this is a maintenance run (which has commands queued one-by-one), @@ -261,15 +301,27 @@ def estop( """ if self._state_store.commands.get_is_stopped(): return - - current_id = ( + running_or_next_queued_id = ( self._state_store.commands.get_running_command_id() or self._state_store.commands.get_queue_ids().head(None) + # TODO(mm, 2024-04-02): This logic looks wrong whenever the next queued + # command is a setup command, which is the normal case in maintenance + # runs. Setup commands won't show up in commands.get_queue_ids(). + ) + running_or_next_queued = ( + self._state_store.commands.get(running_or_next_queued_id) + if running_or_next_queued_id is not None + else None ) - if current_id is not None: + if running_or_next_queued_id is not None: + assert running_or_next_queued is not None + fail_action = FailCommandAction( - command_id=current_id, + command_id=running_or_next_queued_id, + # FIXME(mm, 2024-04-02): As of https://github.com/Opentrons/opentrons/pull/14726, + # this action is only legal if the command is running, not queued. + running_command=running_or_next_queued, error_id=self._model_utils.generate_id(), failed_at=self._model_utils.get_timestamp(), error=EStopActivatedError(message="Estop Activated"), @@ -278,12 +330,21 @@ def estop( ) self._action_dispatcher.dispatch(fail_action) - # In the case where the running command was a setup command - check if there - # are any pending *run* commands and, if so, clear them all - current_id = self._state_store.commands.get_queue_ids().head(None) - if current_id is not None: + # The FailCommandAction above will have cleared all the queued protocol + # OR setup commands, depending on whether we gave it a protocol or setup + # command. We want both to be cleared in either case. So, do that here. + running_or_next_queued_id = self._state_store.commands.get_queue_ids().head( + None + ) + if running_or_next_queued_id is not None: + running_or_next_queued = self._state_store.commands.get( + running_or_next_queued_id + ) fail_action = FailCommandAction( - command_id=current_id, + command_id=running_or_next_queued_id, + # FIXME(mm, 2024-04-02): As of https://github.com/Opentrons/opentrons/pull/14726, + # this action is only legal if the command is running, not queued. + running_command=running_or_next_queued, error_id=self._model_utils.generate_id(), failed_at=self._model_utils.get_timestamp(), error=EStopActivatedError(message="Estop Activated"), diff --git a/api/src/opentrons/protocol_engine/state/commands.py b/api/src/opentrons/protocol_engine/state/commands.py index 2c66e45826d..1ae0cb1ed68 100644 --- a/api/src/opentrons/protocol_engine/state/commands.py +++ b/api/src/opentrons/protocol_engine/state/commands.py @@ -178,6 +178,9 @@ class CommandState: stable. Eventually, we might want this info to be stored directly on each command. """ + recovery_target_command_id: Optional[str] + """If we're currently recovering from a command failure, which command it was.""" + finish_error: Optional[ErrorOccurrence] """The error that happened during the post-run finish steps (homing & dropping tips), if any.""" @@ -213,6 +216,7 @@ def __init__( finish_error=None, failed_command=None, command_error_recovery_types={}, + recovery_target_command_id=None, run_completed_at=None, run_started_at=None, latest_command_hash=None, @@ -300,6 +304,7 @@ def handle_action(self, action: Action) -> None: # noqa: C901 ): if action.type == ErrorRecoveryType.WAIT_FOR_RECOVERY: self._state.queue_status = QueueStatus.AWAITING_RECOVERY + self._state.recovery_target_command_id = action.command_id elif action.type == ErrorRecoveryType.FAIL_RUN: other_command_ids_to_fail = ( self._state.command_history.get_queue_ids() @@ -335,6 +340,7 @@ def handle_action(self, action: Action) -> None: # noqa: C901 elif isinstance(action, ResumeFromRecoveryAction): self._state.queue_status = QueueStatus.RUNNING + self._state.recovery_target_command_id = None elif isinstance(action, StopAction): if not self._state.run_result: @@ -708,6 +714,10 @@ def get_all_commands_final(self) -> bool: return no_command_running and no_command_to_execute + def get_recovery_in_progress_for_command(self, command_id: str) -> bool: + """Return whether the given command failed and its error recovery is in progress.""" + return self._state.recovery_target_command_id == command_id + def raise_fatal_command_error(self) -> None: """Raise the run's fatal command error, if there was one, as an exception. diff --git a/api/src/opentrons/protocol_engine/state/state.py b/api/src/opentrons/protocol_engine/state/state.py index a472b574e6f..6e08bf759c6 100644 --- a/api/src/opentrons/protocol_engine/state/state.py +++ b/api/src/opentrons/protocol_engine/state/state.py @@ -2,8 +2,8 @@ from __future__ import annotations from dataclasses import dataclass -from functools import partial -from typing import Any, Callable, Dict, List, Optional, Sequence, TypeVar +from typing import Callable, Dict, List, Optional, Sequence, TypeVar +from typing_extensions import ParamSpec from opentrons_shared_data.deck.dev_types import DeckDefinitionV4 @@ -30,7 +30,9 @@ from .state_summary import StateSummary from ..types import DeckConfigurationType -ReturnT = TypeVar("ReturnT") + +_ParamsT = ParamSpec("_ParamsT") +_ReturnT = TypeVar("_ReturnT") @dataclass(frozen=True) @@ -210,10 +212,10 @@ def handle_action(self, action: Action) -> None: async def wait_for( self, - condition: Callable[..., Optional[ReturnT]], - *args: Any, - **kwargs: Any, - ) -> ReturnT: + condition: Callable[_ParamsT, _ReturnT], + *args: _ParamsT.args, + **kwargs: _ParamsT.kwargs, + ) -> _ReturnT: """Wait for a condition to become true, checking whenever state changes. If the condition is already true, return immediately. @@ -258,14 +260,43 @@ async def wait_for( Raises: The exception raised by the `condition` function, if any. """ - predicate = partial(condition, *args, **kwargs) - is_done = predicate() - while not is_done: + def predicate() -> _ReturnT: + return condition(*args, **kwargs) + + return await self._wait_for(condition=predicate, truthiness_to_wait_for=True) + + async def wait_for_not( + self, + condition: Callable[_ParamsT, _ReturnT], + *args: _ParamsT.args, + **kwargs: _ParamsT.kwargs, + ) -> _ReturnT: + """Like `wait_for()`, except wait for the condition to become false. + + See the documentation in `wait_for()`, especially the warning about condition + design. + + The advantage of having this separate method over just passing a wrapper lambda + as the condition to `wait_for()` yourself is that wrapper lambdas are hard to + test in the mock-heavy Decoy + Protocol Engine style. + """ + + def predicate() -> _ReturnT: + return condition(*args, **kwargs) + + return await self._wait_for(condition=predicate, truthiness_to_wait_for=False) + + async def _wait_for( + self, condition: Callable[[], _ReturnT], truthiness_to_wait_for: bool + ) -> _ReturnT: + current_value = condition() + + while bool(current_value) != truthiness_to_wait_for: await self._change_notifier.wait() - is_done = predicate() + current_value = condition() - return is_done + return current_value def _get_next_state(self) -> State: """Get a new instance of the state value object.""" diff --git a/api/src/opentrons/protocol_engine/state/tips.py b/api/src/opentrons/protocol_engine/state/tips.py index a2539ff45e7..f5d68d61ee5 100644 --- a/api/src/opentrons/protocol_engine/state/tips.py +++ b/api/src/opentrons/protocol_engine/state/tips.py @@ -7,11 +7,13 @@ from ..actions import ( Action, SucceedCommandAction, + FailCommandAction, ResetTipsAction, ) from ..commands import ( Command, LoadLabwareResult, + PickUpTip, PickUpTipResult, DropTipResult, DropTipInPlaceResult, @@ -20,6 +22,7 @@ PipetteConfigUpdateResultMixin, PipetteNozzleLayoutResultMixin, ) +from ..error_recovery_policy import ErrorRecoveryType from opentrons.hardware_control.nozzle_manager import NozzleMap @@ -71,7 +74,7 @@ def handle_action(self, action: Action) -> None: self._state.channels_by_pipette_id[pipette_id] = config.channels self._state.active_channels_by_pipette_id[pipette_id] = config.channels self._state.nozzle_map_by_pipette_id[pipette_id] = config.nozzle_map - self._handle_command(action.command) + self._handle_succeeded_command(action.command) if isinstance(action.private_result, PipetteNozzleLayoutResultMixin): pipette_id = action.private_result.pipette_id @@ -86,6 +89,9 @@ def handle_action(self, action: Action) -> None: pipette_id ] = self._state.channels_by_pipette_id[pipette_id] + elif isinstance(action, FailCommandAction): + self._handle_failed_command(action) + elif isinstance(action, ResetTipsAction): labware_id = action.labware_id @@ -94,7 +100,7 @@ def handle_action(self, action: Action) -> None: well_name ] = TipRackWellState.CLEAN - def _handle_command(self, command: Command) -> None: + def _handle_succeeded_command(self, command: Command) -> None: if ( isinstance(command.result, LoadLabwareResult) and command.result.definition.parameters.isTiprack @@ -124,6 +130,28 @@ def _handle_command(self, command: Command) -> None: pipette_id = command.params.pipetteId self._state.length_by_pipette_id.pop(pipette_id, None) + def _handle_failed_command( + self, + action: FailCommandAction, + ) -> None: + # If a pickUpTip command fails recoverably, mark the tips as used. This way, + # when the protocol is resumed and the Python Protocol API calls + # `get_next_tip()`, we'll move on to other tips as expected. + # + # We don't attempt this for nonrecoverable errors because maybe the failure + # was due to a bad labware ID or well name. + if ( + isinstance(action.running_command, PickUpTip) + and action.type != ErrorRecoveryType.FAIL_RUN + ): + self._set_used_tips( + pipette_id=action.running_command.params.pipetteId, + labware_id=action.running_command.params.labwareId, + well_name=action.running_command.params.wellName, + ) + # Note: We're logically removing the tip from the tip rack, + # but we're not logically updating the pipette to have that tip on it. + def _set_used_tips( # noqa: C901 self, pipette_id: str, well_name: str, labware_id: str ) -> None: diff --git a/api/src/opentrons/protocol_runner/legacy_command_mapper.py b/api/src/opentrons/protocol_runner/legacy_command_mapper.py index ea212123cb3..e835a6af8e6 100644 --- a/api/src/opentrons/protocol_runner/legacy_command_mapper.py +++ b/api/src/opentrons/protocol_runner/legacy_command_mapper.py @@ -265,6 +265,7 @@ def map_command( # noqa: C901 results.append( pe_actions.FailCommandAction( command_id=running_command.id, + running_command=running_command, error_id=ModelUtils.generate_id(), failed_at=now, error=LegacyContextCommandError(command_error), diff --git a/api/tests/opentrons/protocol_api/core/engine/test_instrument_core.py b/api/tests/opentrons/protocol_api/core/engine/test_instrument_core.py index 3b296067a0d..6ac0e9aaaf0 100644 --- a/api/tests/opentrons/protocol_api/core/engine/test_instrument_core.py +++ b/api/tests/opentrons/protocol_api/core/engine/test_instrument_core.py @@ -276,7 +276,7 @@ def test_pick_up_tip( origin=WellOrigin.TOP, offset=WellOffset(x=3, y=2, z=1) ), ), - mock_engine_client.pick_up_tip( + mock_engine_client.pick_up_tip_wait_for_recovery( pipette_id="abc123", labware_id="labware-id", well_name="well-name", diff --git a/api/tests/opentrons/protocol_engine/execution/test_command_executor.py b/api/tests/opentrons/protocol_engine/execution/test_command_executor.py index 94b7ad25509..2cd753093f9 100644 --- a/api/tests/opentrons/protocol_engine/execution/test_command_executor.py +++ b/api/tests/opentrons/protocol_engine/execution/test_command_executor.py @@ -500,6 +500,7 @@ def _ImplementationCls(self) -> Type[_TestCommandImpl]: action_dispatcher.dispatch( FailCommandAction( command_id="command-id", + running_command=running_command, error_id="error-id", failed_at=datetime(year=2023, month=3, day=3), error=expected_error, diff --git a/api/tests/opentrons/protocol_engine/state/test_command_state.py b/api/tests/opentrons/protocol_engine/state/test_command_state.py index 001b1b7640c..8f1ea39fc00 100644 --- a/api/tests/opentrons/protocol_engine/state/test_command_state.py +++ b/api/tests/opentrons/protocol_engine/state/test_command_state.py @@ -6,10 +6,14 @@ from datetime import datetime -from opentrons_shared_data.errors import PythonException +import pytest -from opentrons.protocol_engine import actions, commands +from opentrons_shared_data.errors import ErrorCodes, PythonException + +from opentrons.ordered_set import OrderedSet +from opentrons.protocol_engine import actions, commands, errors from opentrons.protocol_engine.error_recovery_policy import ErrorRecoveryType +from opentrons.protocol_engine.notes.notes import CommandNote from opentrons.protocol_engine.state.commands import CommandStore, CommandView from opentrons.protocol_engine.state.config import Config from opentrons.protocol_engine.types import DeckType @@ -23,6 +27,269 @@ def _make_config() -> Config: ) +@pytest.mark.parametrize("error_recovery_type", ErrorRecoveryType) +def test_command_failure(error_recovery_type: ErrorRecoveryType) -> None: + """It should store an error and mark the command if it fails.""" + subject = CommandStore(is_door_open=False, config=_make_config()) + subject_view = CommandView(subject.state) + + command_id = "command-id" + command_key = "command-key" + created_at = datetime(year=2021, month=1, day=1) + started_at = datetime(year=2022, month=2, day=2) + failed_at = datetime(year=2023, month=3, day=3) + error_id = "error-id" + notes = [ + CommandNote( + noteKind="noteKind", + shortMessage="shortMessage", + longMessage="longMessage", + source="source", + ) + ] + + params = commands.CommentParams(message="No comment.") + + subject.handle_action( + actions.QueueCommandAction( + command_id=command_id, + created_at=created_at, + request=commands.CommentCreate(params=params, key=command_key), + request_hash=None, + ) + ) + subject.handle_action( + actions.RunCommandAction(command_id=command_id, started_at=started_at) + ) + subject.handle_action( + actions.FailCommandAction( + command_id=command_id, + running_command=subject_view.get(command_id), + error_id=error_id, + failed_at=failed_at, + error=errors.ProtocolEngineError(message="oh no"), + notes=notes, + type=error_recovery_type, + ) + ) + + expected_error_occurrence = errors.ErrorOccurrence( + id=error_id, + errorType="ProtocolEngineError", + createdAt=failed_at, + detail="oh no", + errorCode=ErrorCodes.GENERAL_ERROR.value.code, + ) + expected_failed_command = commands.Comment( + id=command_id, + key=command_key, + commandType="comment", + createdAt=created_at, + startedAt=started_at, + completedAt=failed_at, + status=commands.CommandStatus.FAILED, + params=params, + result=None, + error=expected_error_occurrence, + notes=notes, + ) + + assert subject_view.get("command-id") == expected_failed_command + + +def test_command_failure_clears_queues() -> None: + """It should clear the command queue on command failure.""" + subject = CommandStore(config=_make_config(), is_door_open=False) + subject_view = CommandView(subject.state) + + queue_1 = actions.QueueCommandAction( + request=commands.WaitForResumeCreate( + params=commands.WaitForResumeParams(), key="command-key-1" + ), + request_hash=None, + created_at=datetime(year=2021, month=1, day=1), + command_id="command-id-1", + ) + subject.handle_action(queue_1) + queue_2 = actions.QueueCommandAction( + request=commands.WaitForResumeCreate( + params=commands.WaitForResumeParams(), key="command-key-2" + ), + request_hash=None, + created_at=datetime(year=2021, month=1, day=1), + command_id="command-id-2", + ) + subject.handle_action(queue_2) + + run_1 = actions.RunCommandAction( + command_id="command-id-1", + started_at=datetime(year=2022, month=2, day=2), + ) + subject.handle_action(run_1) + fail_1 = actions.FailCommandAction( + command_id="command-id-1", + running_command=subject_view.get("command-id-1"), + error_id="error-id", + failed_at=datetime(year=2023, month=3, day=3), + error=errors.ProtocolEngineError(message="oh no"), + notes=[ + CommandNote( + noteKind="noteKind", + shortMessage="shortMessage", + longMessage="longMessage", + source="source", + ) + ], + type=ErrorRecoveryType.FAIL_RUN, + ) + subject.handle_action(fail_1) + + assert [(c.id, c.status) for c in subject_view.get_all()] == [ + ("command-id-1", commands.CommandStatus.FAILED), + ("command-id-2", commands.CommandStatus.FAILED), + ] + assert subject_view.get_running_command_id() is None + assert subject_view.get_queue_ids() == OrderedSet() + assert subject_view.get_next_to_execute() is None + + +def test_setup_command_failure_only_clears_setup_command_queue() -> None: + """It should clear only the setup command queue for a failed setup command. + + This test queues up a non-setup command followed by two setup commands, + then runs and fails the first setup command. + """ + subject = CommandStore(is_door_open=False, config=_make_config()) + subject_view = CommandView(subject.state) + + queue_1 = actions.QueueCommandAction( + request=commands.WaitForResumeCreate( + params=commands.WaitForResumeParams(), key="command-key-1" + ), + request_hash=None, + created_at=datetime(year=2021, month=1, day=1), + command_id="command-id-1", + ) + subject.handle_action(queue_1) + queue_2_setup = actions.QueueCommandAction( + request=commands.WaitForResumeCreate( + params=commands.WaitForResumeParams(), + intent=commands.CommandIntent.SETUP, + key="command-key-2", + ), + request_hash=None, + created_at=datetime(year=2021, month=1, day=1), + command_id="command-id-2", + ) + subject.handle_action(queue_2_setup) + queue_3_setup = actions.QueueCommandAction( + request=commands.WaitForResumeCreate( + params=commands.WaitForResumeParams(), + intent=commands.CommandIntent.SETUP, + key="command-key-3", + ), + request_hash=None, + created_at=datetime(year=2021, month=1, day=1), + command_id="command-id-3", + ) + subject.handle_action(queue_3_setup) + + run_2_setup = actions.RunCommandAction( + command_id="command-id-2", + started_at=datetime(year=2022, month=2, day=2), + ) + subject.handle_action(run_2_setup) + fail_2_setup = actions.FailCommandAction( + command_id="command-id-2", + running_command=subject_view.get("command-id-2"), + error_id="error-id", + failed_at=datetime(year=2023, month=3, day=3), + error=errors.ProtocolEngineError(message="oh no"), + notes=[ + CommandNote( + noteKind="noteKind", + shortMessage="shortMessage", + longMessage="longMessage", + source="source", + ) + ], + type=ErrorRecoveryType.FAIL_RUN, + ) + subject.handle_action(fail_2_setup) + + assert [(c.id, c.status) for c in subject_view.get_all()] == [ + ("command-id-1", commands.CommandStatus.QUEUED), + ("command-id-2", commands.CommandStatus.FAILED), + ("command-id-3", commands.CommandStatus.FAILED), + ] + assert subject_view.get_running_command_id() is None + + subject.handle_action( + actions.PlayAction(requested_at=datetime.now(), deck_configuration=None) + ) + assert subject_view.get_next_to_execute() == "command-id-1" + + +def test_nonfatal_command_failure() -> None: + """Test the command queue if a command fails recoverably. + + Commands that were after the failed command in the queue should be left in + the queue. + + The queue status should be "awaiting-recovery." + """ + subject = CommandStore(is_door_open=False, config=_make_config()) + subject_view = CommandView(subject.state) + + queue_1 = actions.QueueCommandAction( + request=commands.WaitForResumeCreate( + params=commands.WaitForResumeParams(), key="command-key-1" + ), + request_hash=None, + created_at=datetime(year=2021, month=1, day=1), + command_id="command-id-1", + ) + subject.handle_action(queue_1) + queue_2 = actions.QueueCommandAction( + request=commands.WaitForResumeCreate( + params=commands.WaitForResumeParams(), key="command-key-2" + ), + request_hash=None, + created_at=datetime(year=2021, month=1, day=1), + command_id="command-id-2", + ) + subject.handle_action(queue_2) + + run_1 = actions.RunCommandAction( + command_id="command-id-1", + started_at=datetime(year=2022, month=2, day=2), + ) + subject.handle_action(run_1) + fail_1 = actions.FailCommandAction( + command_id="command-id-1", + running_command=subject_view.get("command-id-1"), + error_id="error-id", + failed_at=datetime(year=2023, month=3, day=3), + error=errors.ProtocolEngineError(message="oh no"), + notes=[ + CommandNote( + noteKind="noteKind", + shortMessage="shortMessage", + longMessage="longMessage", + source="source", + ) + ], + type=ErrorRecoveryType.WAIT_FOR_RECOVERY, + ) + subject.handle_action(fail_1) + + assert [(c.id, c.status) for c in subject_view.get_all()] == [ + ("command-id-1", commands.CommandStatus.FAILED), + ("command-id-2", commands.CommandStatus.QUEUED), + ] + assert subject_view.get_running_command_id() is None + + def test_error_recovery_type_tracking() -> None: """It should keep track of each failed command's error recovery type.""" subject = CommandStore(config=_make_config(), is_door_open=False) @@ -50,9 +317,11 @@ def test_error_recovery_type_tracking() -> None: subject.handle_action( actions.RunCommandAction(command_id="c1", started_at=datetime.now()) ) + running_command_1 = CommandView(subject.state).get("c1") subject.handle_action( actions.FailCommandAction( command_id="c1", + running_command=running_command_1, error_id="c1-error", failed_at=datetime.now(), error=PythonException(RuntimeError("new sheriff in town")), @@ -63,9 +332,11 @@ def test_error_recovery_type_tracking() -> None: subject.handle_action( actions.RunCommandAction(command_id="c2", started_at=datetime.now()) ) + running_command_2 = CommandView(subject.state).get("c2") subject.handle_action( actions.FailCommandAction( command_id="c2", + running_command=running_command_2, error_id="c2-error", failed_at=datetime.now(), error=PythonException(RuntimeError("new sheriff in town")), @@ -77,3 +348,89 @@ def test_error_recovery_type_tracking() -> None: view = CommandView(subject.state) assert view.get_error_recovery_type("c1") == ErrorRecoveryType.WAIT_FOR_RECOVERY assert view.get_error_recovery_type("c2") == ErrorRecoveryType.FAIL_RUN + + +def test_get_recovery_in_progress_for_command() -> None: + """It should return whether error recovery is in progress for the given command.""" + subject = CommandStore(config=_make_config(), is_door_open=False) + subject_view = CommandView(subject.state) + + queue_1 = actions.QueueCommandAction( + "c1", + created_at=datetime.now(), + request=commands.CommentCreate(params=commands.CommentParams(message="")), + request_hash=None, + ) + subject.handle_action(queue_1) + run_1 = actions.RunCommandAction(command_id="c1", started_at=datetime.now()) + subject.handle_action(run_1) + fail_1 = actions.FailCommandAction( + command_id="c1", + error_id="c1-error", + failed_at=datetime.now(), + error=PythonException(RuntimeError()), + notes=[], + type=ErrorRecoveryType.WAIT_FOR_RECOVERY, + running_command=subject_view.get("c1"), + ) + subject.handle_action(fail_1) + + # c1 failed recoverably and we're currently recovering from it. + assert subject_view.get_recovery_in_progress_for_command("c1") + + resume_from_1_recovery = actions.ResumeFromRecoveryAction() + subject.handle_action(resume_from_1_recovery) + + # c1 failed recoverably, but we've already completed its recovery. + assert not subject_view.get_recovery_in_progress_for_command("c1") + + queue_2 = actions.QueueCommandAction( + "c2", + created_at=datetime.now(), + request=commands.CommentCreate(params=commands.CommentParams(message="")), + request_hash=None, + ) + subject.handle_action(queue_2) + run_2 = actions.RunCommandAction(command_id="c2", started_at=datetime.now()) + subject.handle_action(run_2) + fail_2 = actions.FailCommandAction( + command_id="c2", + error_id="c2-error", + failed_at=datetime.now(), + error=PythonException(RuntimeError()), + notes=[], + type=ErrorRecoveryType.WAIT_FOR_RECOVERY, + running_command=subject_view.get("c2"), + ) + subject.handle_action(fail_2) + + # c2 failed recoverably and we're currently recovering from it. + assert subject_view.get_recovery_in_progress_for_command("c2") + # ...and that means we're *not* currently recovering from c1, + # even though it failed recoverably before. + assert not subject_view.get_recovery_in_progress_for_command("c1") + + resume_from_2_recovery = actions.ResumeFromRecoveryAction() + subject.handle_action(resume_from_2_recovery) + queue_3 = actions.QueueCommandAction( + "c3", + created_at=datetime.now(), + request=commands.CommentCreate(params=commands.CommentParams(message="")), + request_hash=None, + ) + subject.handle_action(queue_3) + run_3 = actions.RunCommandAction(command_id="c3", started_at=datetime.now()) + subject.handle_action(run_3) + fail_3 = actions.FailCommandAction( + command_id="c3", + error_id="c3-error", + failed_at=datetime.now(), + error=PythonException(RuntimeError()), + notes=[], + type=ErrorRecoveryType.FAIL_RUN, + running_command=subject_view.get("c3"), + ) + subject.handle_action(fail_3) + + # c3 failed, but not recoverably. + assert not subject_view.get_recovery_in_progress_for_command("c2") diff --git a/api/tests/opentrons/protocol_engine/state/test_command_store_old.py b/api/tests/opentrons/protocol_engine/state/test_command_store_old.py index 7afde4a6e4b..a859ae7573b 100644 --- a/api/tests/opentrons/protocol_engine/state/test_command_store_old.py +++ b/api/tests/opentrons/protocol_engine/state/test_command_store_old.py @@ -14,12 +14,10 @@ from opentrons.ordered_set import OrderedSet from opentrons.protocol_engine.actions.actions import RunCommandAction -from opentrons.protocol_engine.notes.notes import CommandNote from opentrons.types import MountType, DeckSlotName from opentrons.hardware_control.types import DoorState from opentrons.protocol_engine import commands, errors -from opentrons.protocol_engine.error_recovery_policy import ErrorRecoveryType from opentrons.protocol_engine.types import DeckSlotLocation, DeckType, WellLocation from opentrons.protocol_engine.state import Config from opentrons.protocol_engine.state.commands import ( @@ -33,7 +31,6 @@ from opentrons.protocol_engine.actions import ( QueueCommandAction, SucceedCommandAction, - FailCommandAction, PlayAction, PauseAction, PauseSource, @@ -86,6 +83,7 @@ def test_initial_state( finish_error=None, failed_command=None, command_error_recovery_types={}, + recovery_target_command_id=None, latest_command_hash=None, stopped_by_estop=False, ) @@ -429,321 +427,6 @@ def test_running_command_id() -> None: assert subject.state.command_history.get_running_command() is None -def test_command_failure_clears_queues() -> None: - """It should clear the command queue on command failure.""" - queue_1 = QueueCommandAction( - request=commands.WaitForResumeCreate( - params=commands.WaitForResumeParams(), key="command-key-1" - ), - request_hash=None, - created_at=datetime(year=2021, month=1, day=1), - command_id="command-id-1", - ) - queue_2 = QueueCommandAction( - request=commands.WaitForResumeCreate( - params=commands.WaitForResumeParams(), key="command-key-2" - ), - request_hash=None, - created_at=datetime(year=2021, month=1, day=1), - command_id="command-id-2", - ) - run_1 = RunCommandAction( - command_id="command-id-1", - started_at=datetime(year=2022, month=2, day=2), - ) - fail_1 = FailCommandAction( - command_id="command-id-1", - error_id="error-id", - failed_at=datetime(year=2023, month=3, day=3), - error=errors.ProtocolEngineError(message="oh no"), - notes=[ - CommandNote( - noteKind="noteKind", - shortMessage="shortMessage", - longMessage="longMessage", - source="source", - ) - ], - type=ErrorRecoveryType.FAIL_RUN, - ) - - expected_failed_1 = commands.WaitForResume( - id="command-id-1", - key="command-key-1", - error=errors.ErrorOccurrence( - id="error-id", - createdAt=datetime(year=2023, month=3, day=3), - errorCode=ErrorCodes.GENERAL_ERROR.value.code, - errorType="ProtocolEngineError", - detail="oh no", - ), - notes=[ - CommandNote( - noteKind="noteKind", - shortMessage="shortMessage", - longMessage="longMessage", - source="source", - ) - ], - createdAt=datetime(year=2021, month=1, day=1), - startedAt=datetime(year=2022, month=2, day=2), - completedAt=datetime(year=2023, month=3, day=3), - params=commands.WaitForResumeParams(), - status=commands.CommandStatus.FAILED, - ) - expected_failed_2 = commands.WaitForResume( - id="command-id-2", - key="command-key-2", - error=None, - createdAt=datetime(year=2021, month=1, day=1), - completedAt=datetime(year=2023, month=3, day=3), - params=commands.WaitForResumeParams(), - status=commands.CommandStatus.FAILED, - ) - - subject = CommandStore(is_door_open=False, config=_make_config()) - - subject.handle_action(queue_1) - subject.handle_action(queue_2) - subject.handle_action(run_1) - subject.handle_action(fail_1) - - assert subject.state.command_history.get_running_command() is None - assert subject.state.command_history.get_queue_ids() == OrderedSet() - assert subject.state.command_history.get_all_ids() == [ - "command-id-1", - "command-id-2", - ] - assert subject.state.command_history.get("command-id-1") == CommandEntry( - index=0, command=expected_failed_1 - ) - assert subject.state.command_history.get("command-id-2") == CommandEntry( - index=1, command=expected_failed_2 - ) - - -def test_setup_command_failure_only_clears_setup_command_queue() -> None: - """It should clear only the setup command queue for a failed setup command. - - This test queues up a non-setup command followed by two setup commands, - then attempts to run and fail the first setup command and - """ - cmd_1_non_setup = commands.WaitForResume( - id="command-id-1", - key="command-key-1", - createdAt=datetime(year=2021, month=1, day=1), - params=commands.WaitForResumeParams(), - status=commands.CommandStatus.QUEUED, - ) - queue_action_1_non_setup = QueueCommandAction( - request=commands.WaitForResumeCreate( - params=cmd_1_non_setup.params, key="command-key-1" - ), - request_hash=None, - created_at=datetime(year=2021, month=1, day=1), - command_id="command-id-1", - ) - queue_action_2_setup = QueueCommandAction( - request=commands.WaitForResumeCreate( - params=commands.WaitForResumeParams(), - intent=commands.CommandIntent.SETUP, - key="command-key-2", - ), - request_hash=None, - created_at=datetime(year=2021, month=1, day=1), - command_id="command-id-2", - ) - queue_action_3_setup = QueueCommandAction( - request=commands.WaitForResumeCreate( - params=commands.WaitForResumeParams(), - intent=commands.CommandIntent.SETUP, - key="command-key-3", - ), - request_hash=None, - created_at=datetime(year=2021, month=1, day=1), - command_id="command-id-3", - ) - - run_action_cmd_2 = RunCommandAction( - command_id="command-id-2", - started_at=datetime(year=2022, month=2, day=2), - ) - failed_action_cmd_2 = FailCommandAction( - command_id="command-id-2", - error_id="error-id", - failed_at=datetime(year=2023, month=3, day=3), - error=errors.ProtocolEngineError(message="oh no"), - notes=[ - CommandNote( - noteKind="noteKind", - shortMessage="shortMessage", - longMessage="longMessage", - source="source", - ) - ], - type=ErrorRecoveryType.FAIL_RUN, - ) - expected_failed_cmd_2 = commands.WaitForResume( - id="command-id-2", - key="command-key-2", - error=errors.ErrorOccurrence( - id="error-id", - createdAt=datetime(year=2023, month=3, day=3), - errorType="ProtocolEngineError", - detail="oh no", - errorCode=ErrorCodes.GENERAL_ERROR.value.code, - ), - notes=[ - CommandNote( - noteKind="noteKind", - shortMessage="shortMessage", - longMessage="longMessage", - source="source", - ) - ], - createdAt=datetime(year=2021, month=1, day=1), - startedAt=datetime(year=2022, month=2, day=2), - completedAt=datetime(year=2023, month=3, day=3), - params=commands.WaitForResumeParams(), - status=commands.CommandStatus.FAILED, - intent=commands.CommandIntent.SETUP, - ) - expected_failed_cmd_3 = commands.WaitForResume( - id="command-id-3", - key="command-key-3", - error=None, - createdAt=datetime(year=2021, month=1, day=1), - completedAt=datetime(year=2023, month=3, day=3), - params=commands.WaitForResumeParams(), - status=commands.CommandStatus.FAILED, - intent=commands.CommandIntent.SETUP, - ) - - subject = CommandStore(is_door_open=False, config=_make_config()) - - subject.handle_action(queue_action_1_non_setup) - subject.handle_action(queue_action_2_setup) - subject.handle_action(queue_action_3_setup) - subject.handle_action(run_action_cmd_2) - subject.handle_action(failed_action_cmd_2) - - assert subject.state.command_history.get_running_command() is None - assert subject.state.command_history.get_setup_queue_ids() == OrderedSet() - assert subject.state.command_history.get_queue_ids() == OrderedSet(["command-id-1"]) - assert subject.state.command_history.get_all_ids() == [ - "command-id-1", - "command-id-2", - "command-id-3", - ] - assert subject.state.command_history.get("command-id-1") == CommandEntry( - index=0, command=cmd_1_non_setup - ) - assert subject.state.command_history.get("command-id-2") == CommandEntry( - index=1, command=expected_failed_cmd_2 - ) - assert subject.state.command_history.get("command-id-3") == CommandEntry( - index=2, command=expected_failed_cmd_3 - ) - - -def test_nonfatal_command_failure() -> None: - """Test the command queue if a command fails recoverably. - - Commands that were after the failed command in the queue should be left in - the queue. - """ - queue_1 = QueueCommandAction( - request=commands.WaitForResumeCreate( - params=commands.WaitForResumeParams(), key="command-key-1" - ), - request_hash=None, - created_at=datetime(year=2021, month=1, day=1), - command_id="command-id-1", - ) - queue_2 = QueueCommandAction( - request=commands.WaitForResumeCreate( - params=commands.WaitForResumeParams(), key="command-key-2" - ), - request_hash=None, - created_at=datetime(year=2021, month=1, day=1), - command_id="command-id-2", - ) - run_1 = RunCommandAction( - command_id="command-id-1", - started_at=datetime(year=2022, month=2, day=2), - ) - fail_1 = FailCommandAction( - command_id="command-id-1", - error_id="error-id", - failed_at=datetime(year=2023, month=3, day=3), - error=errors.ProtocolEngineError(message="oh no"), - notes=[ - CommandNote( - noteKind="noteKind", - shortMessage="shortMessage", - longMessage="longMessage", - source="source", - ) - ], - type=ErrorRecoveryType.WAIT_FOR_RECOVERY, - ) - - expected_failed_1 = commands.WaitForResume( - id="command-id-1", - key="command-key-1", - error=errors.ErrorOccurrence( - id="error-id", - createdAt=datetime(year=2023, month=3, day=3), - errorCode=ErrorCodes.GENERAL_ERROR.value.code, - errorType="ProtocolEngineError", - detail="oh no", - ), - notes=[ - CommandNote( - noteKind="noteKind", - shortMessage="shortMessage", - longMessage="longMessage", - source="source", - ) - ], - createdAt=datetime(year=2021, month=1, day=1), - startedAt=datetime(year=2022, month=2, day=2), - completedAt=datetime(year=2023, month=3, day=3), - params=commands.WaitForResumeParams(), - status=commands.CommandStatus.FAILED, - ) - expected_queued_2 = commands.WaitForResume( - id="command-id-2", - key="command-key-2", - error=None, - createdAt=datetime(year=2021, month=1, day=1), - startedAt=None, - completedAt=None, - params=commands.WaitForResumeParams(), - status=commands.CommandStatus.QUEUED, - ) - - subject = CommandStore(is_door_open=False, config=_make_config()) - - subject.handle_action(queue_1) - subject.handle_action(queue_2) - subject.handle_action(run_1) - subject.handle_action(fail_1) - - assert subject.state.command_history.get_running_command() is None - assert subject.state.command_history.get_queue_ids() == OrderedSet(["command-id-2"]) - assert subject.state.command_history.get_all_ids() == [ - "command-id-1", - "command-id-2", - ] - assert subject.state.command_history.get("command-id-1") == CommandEntry( - index=0, command=expected_failed_1 - ) - assert subject.state.command_history.get("command-id-2") == CommandEntry( - index=1, command=expected_queued_2 - ) - - def test_command_store_keeps_commands_in_queue_order() -> None: """It should keep commands in the order they were originally enqueued.""" command_create_1_non_setup = commands.CommentCreate( @@ -834,6 +517,7 @@ def test_command_store_handles_pause_action(pause_source: PauseSource) -> None: finish_error=None, failed_command=None, command_error_recovery_types={}, + recovery_target_command_id=None, latest_command_hash=None, stopped_by_estop=False, ) @@ -859,6 +543,7 @@ def test_command_store_handles_play_action(pause_source: PauseSource) -> None: finish_error=None, failed_command=None, command_error_recovery_types={}, + recovery_target_command_id=None, run_started_at=datetime(year=2021, month=1, day=1), latest_command_hash=None, stopped_by_estop=False, @@ -890,6 +575,7 @@ def test_command_store_handles_finish_action() -> None: finish_error=None, failed_command=None, command_error_recovery_types={}, + recovery_target_command_id=None, run_started_at=datetime(year=2021, month=1, day=1), latest_command_hash=None, stopped_by_estop=False, @@ -936,6 +622,7 @@ def test_command_store_handles_stop_action(from_estop: bool) -> None: finish_error=None, failed_command=None, command_error_recovery_types={}, + recovery_target_command_id=None, run_started_at=datetime(year=2021, month=1, day=1), latest_command_hash=None, stopped_by_estop=from_estop, @@ -966,6 +653,7 @@ def test_command_store_cannot_restart_after_should_stop() -> None: finish_error=None, failed_command=None, command_error_recovery_types={}, + recovery_target_command_id=None, run_started_at=None, latest_command_hash=None, stopped_by_estop=False, @@ -1098,6 +786,7 @@ def test_command_store_wraps_unknown_errors() -> None: run_started_at=None, failed_command=None, command_error_recovery_types={}, + recovery_target_command_id=None, latest_command_hash=None, stopped_by_estop=False, ) @@ -1159,6 +848,7 @@ def __init__(self, message: str) -> None: ), failed_command=None, command_error_recovery_types={}, + recovery_target_command_id=None, run_started_at=None, latest_command_hash=None, stopped_by_estop=False, @@ -1191,6 +881,7 @@ def test_command_store_ignores_stop_after_graceful_finish() -> None: finish_error=None, failed_command=None, command_error_recovery_types={}, + recovery_target_command_id=None, run_started_at=datetime(year=2021, month=1, day=1), latest_command_hash=None, stopped_by_estop=False, @@ -1223,6 +914,7 @@ def test_command_store_ignores_finish_after_non_graceful_stop() -> None: finish_error=None, failed_command=None, command_error_recovery_types={}, + recovery_target_command_id=None, run_started_at=datetime(year=2021, month=1, day=1), latest_command_hash=None, stopped_by_estop=False, @@ -1233,102 +925,6 @@ def test_command_store_ignores_finish_after_non_graceful_stop() -> None: assert subject.state.command_history.get_setup_queue_ids() == OrderedSet() -def test_command_store_handles_command_failed() -> None: - """It should store an error and mark the command if it fails.""" - error_recovery_type = ErrorRecoveryType.FAIL_RUN - - expected_error_occurrence = errors.ErrorOccurrence( - id="error-id", - errorType="ProtocolEngineError", - createdAt=datetime(year=2023, month=3, day=3), - detail="oh no", - errorCode=ErrorCodes.GENERAL_ERROR.value.code, - ) - - expected_failed_command = commands.Comment( - id="command-id", - commandType="comment", - key="command-key", - createdAt=datetime(year=2021, month=1, day=1), - startedAt=datetime(year=2022, month=2, day=2), - completedAt=expected_error_occurrence.createdAt, - status=commands.CommandStatus.FAILED, - params=commands.CommentParams(message="hello, world"), - result=None, - error=expected_error_occurrence, - notes=[ - CommandNote( - noteKind="noteKind", - shortMessage="shortMessage", - longMessage="longMessage", - source="source", - ) - ], - ) - - subject = CommandStore(is_door_open=False, config=_make_config()) - - subject.handle_action( - QueueCommandAction( - command_id=expected_failed_command.id, - created_at=expected_failed_command.createdAt, - request=commands.CommentCreate( - params=expected_failed_command.params, key=expected_failed_command.key - ), - request_hash=None, - ) - ) - subject.handle_action( - RunCommandAction( - command_id=expected_failed_command.id, - # Ignore arg-type errors because we know this isn't None. - started_at=expected_failed_command.startedAt, # type: ignore[arg-type] - ) - ) - subject.handle_action( - FailCommandAction( - command_id=expected_failed_command.id, - error_id=expected_error_occurrence.id, - failed_at=expected_error_occurrence.createdAt, - error=errors.ProtocolEngineError(message="oh no"), - notes=[ - CommandNote( - noteKind="noteKind", - shortMessage="shortMessage", - longMessage="longMessage", - source="source", - ) - ], - type=error_recovery_type, - ) - ) - - failed_command_entry = CommandEntry(index=0, command=expected_failed_command) - command_history = CommandHistory() - command_history._add("command-id", failed_command_entry) - command_history._set_terminal_command_id("command-id") - - assert subject.state == CommandState( - command_history=command_history, - queue_status=QueueStatus.SETUP, - run_result=None, - run_completed_at=None, - is_door_blocking=False, - run_error=None, - finish_error=None, - failed_command=failed_command_entry, - command_error_recovery_types={expected_failed_command.id: error_recovery_type}, - run_started_at=None, - latest_command_hash=None, - stopped_by_estop=False, - ) - assert subject.state.command_history.get_running_command() is None - assert subject.state.command_history.get_all_ids() == ["command-id"] - assert subject.state.command_history.get_queue_ids() == OrderedSet() - assert subject.state.command_history.get_setup_queue_ids() == OrderedSet() - assert subject.state.command_history.get("command-id") == failed_command_entry - - def test_handles_hardware_stopped() -> None: """It should mark the hardware as stopped on HardwareStoppedAction.""" subject = CommandStore(is_door_open=False, config=_make_config()) @@ -1347,6 +943,7 @@ def test_handles_hardware_stopped() -> None: finish_error=None, failed_command=None, command_error_recovery_types={}, + recovery_target_command_id=None, run_started_at=None, latest_command_hash=None, stopped_by_estop=False, diff --git a/api/tests/opentrons/protocol_engine/state/test_command_view_old.py b/api/tests/opentrons/protocol_engine/state/test_command_view_old.py index 64d7670f662..a9b5fc92cc3 100644 --- a/api/tests/opentrons/protocol_engine/state/test_command_view_old.py +++ b/api/tests/opentrons/protocol_engine/state/test_command_view_old.py @@ -58,6 +58,7 @@ def get_command_view( run_error: Optional[errors.ErrorOccurrence] = None, failed_command: Optional[CommandEntry] = None, command_error_recovery_types: Optional[Dict[str, ErrorRecoveryType]] = None, + recovery_target_command_id: Optional[str] = None, finish_error: Optional[errors.ErrorOccurrence] = None, commands: Sequence[cmd.Command] = (), latest_command_hash: Optional[str] = None, @@ -90,6 +91,7 @@ def get_command_view( finish_error=finish_error, failed_command=failed_command, command_error_recovery_types=command_error_recovery_types or {}, + recovery_target_command_id=recovery_target_command_id, run_started_at=run_started_at, latest_command_hash=latest_command_hash, stopped_by_estop=False, diff --git a/api/tests/opentrons/protocol_engine/state/test_state_store.py b/api/tests/opentrons/protocol_engine/state/test_state_store.py index dd32bbec591..170f05bb4b9 100644 --- a/api/tests/opentrons/protocol_engine/state/test_state_store.py +++ b/api/tests/opentrons/protocol_engine/state/test_state_store.py @@ -1,5 +1,5 @@ """Tests for the top-level StateStore/StateView.""" -from typing import Callable, Optional +from typing import Callable, Union from datetime import datetime import pytest @@ -80,47 +80,52 @@ def test_notify_on_state_change( decoy.verify(change_notifier.notify(), times=1) -async def test_wait_for_state( +async def test_wait_for( decoy: Decoy, change_notifier: ChangeNotifier, subject: StateStore, ) -> None: """It should return an awaitable that signals state changes.""" - check_condition: Callable[..., Optional[str]] = decoy.mock(name="check_condition") + check_condition: Callable[..., Union[str, int]] = decoy.mock(name="check_condition") decoy.when(check_condition("foo", bar="baz")).then_return( - None, - None, + 0, + 0, "hello world", ) - result = await subject.wait_for(check_condition, "foo", bar="baz") assert result == "hello world" + decoy.verify(await change_notifier.wait(), times=2) + decoy.reset() + + decoy.when(check_condition("foo", bar="baz")).then_return( + "hello world", + "hello world again", + 0, + ) + result = await subject.wait_for_not(check_condition, "foo", bar="baz") + assert result == 0 decoy.verify(await change_notifier.wait(), times=2) -async def test_wait_for_state_short_circuit( +async def test_wait_for_already_satisfied( decoy: Decoy, subject: StateStore, change_notifier: ChangeNotifier, ) -> None: - """It should short-circuit the change notifier if condition is satisfied.""" - check_condition: Callable[..., Optional[str]] = decoy.mock(name="check_condition") + """It should return immediately and skip the change notifier.""" + check_condition: Callable[..., Union[str, int]] = decoy.mock(name="check_condition") decoy.when(check_condition("foo", bar="baz")).then_return("hello world") - result = await subject.wait_for(check_condition, "foo", bar="baz") assert result == "hello world" - decoy.verify(await change_notifier.wait(), times=0) - -async def test_wait_for_already_true(decoy: Decoy, subject: StateStore) -> None: - """It should signal immediately if condition is already met.""" - check_condition = decoy.mock(name="check_condition") - decoy.when(check_condition()).then_return(True) - await subject.wait_for(check_condition) + decoy.when(check_condition("foo", bar="baz")).then_return(0) + result = await subject.wait_for_not(check_condition, "foo", bar="baz") + assert result == 0 + decoy.verify(await change_notifier.wait(), times=0) async def test_wait_for_raises(decoy: Decoy, subject: StateStore) -> None: @@ -131,3 +136,6 @@ async def test_wait_for_raises(decoy: Decoy, subject: StateStore) -> None: with pytest.raises(ValueError, match="oh no"): await subject.wait_for(check_condition) + + with pytest.raises(ValueError, match="oh no"): + await subject.wait_for_not(check_condition) diff --git a/api/tests/opentrons/protocol_engine/test_protocol_engine.py b/api/tests/opentrons/protocol_engine/test_protocol_engine.py index 2191b1c4954..dd96b8d968a 100644 --- a/api/tests/opentrons/protocol_engine/test_protocol_engine.py +++ b/api/tests/opentrons/protocol_engine/test_protocol_engine.py @@ -2,6 +2,7 @@ import inspect from datetime import datetime from typing import Any +from unittest.mock import sentinel import pytest from decoy import Decoy @@ -333,6 +334,99 @@ def _stub_completed(*_a: object, **_k: object) -> bool: assert result == completed +async def test_add_and_execute_command_wait_for_recovery( + decoy: Decoy, + state_store: StateStore, + action_dispatcher: ActionDispatcher, + model_utils: ModelUtils, + subject: ProtocolEngine, +) -> None: + """It should add and execute a command from a request.""" + created_at = datetime(year=2021, month=1, day=1) + original_request = commands.WaitForResumeCreate( + params=commands.WaitForResumeParams() + ) + standardized_request = commands.HomeCreate(params=commands.HomeParams()) + queued = commands.Home( + id="command-id", + key="command-key", + status=commands.CommandStatus.QUEUED, + createdAt=created_at, + params=commands.HomeParams(), + ) + completed = commands.Home( + id="command-id", + key="command-key", + status=commands.CommandStatus.SUCCEEDED, + createdAt=created_at, + params=commands.HomeParams(), + ) + + robot_type: RobotType = "OT-3 Standard" + decoy.when(state_store.config).then_return( + Config(robot_type=robot_type, deck_type=DeckType.OT3_STANDARD) + ) + + decoy.when( + slot_standardization.standardize_command(original_request, robot_type) + ).then_return(standardized_request) + + decoy.when(model_utils.generate_id()).then_return("command-id") + decoy.when(model_utils.get_timestamp()).then_return(created_at) + + def _stub_queued(*_a: object, **_k: object) -> None: + decoy.when(state_store.commands.get("command-id")).then_return(queued) + + def _stub_completed(*_a: object, **_k: object) -> bool: + decoy.when(state_store.commands.get("command-id")).then_return(completed) + return True + + decoy.when( + state_store.commands.validate_action_allowed( + QueueCommandAction( + command_id="command-id", + created_at=created_at, + request=standardized_request, + request_hash=None, + ) + ) + ).then_return( + QueueCommandAction( + command_id="command-id-validated", + created_at=created_at, + request=standardized_request, + request_hash=None, + ) + ) + + decoy.when( + action_dispatcher.dispatch( + QueueCommandAction( + command_id="command-id-validated", + created_at=created_at, + request=standardized_request, + request_hash=None, + ) + ) + ).then_do(_stub_queued) + + decoy.when( + await state_store.wait_for( + condition=state_store.commands.get_command_is_final, + command_id="command-id", + ), + ).then_do(_stub_completed) + + result = await subject.add_and_execute_command_wait_for_recovery(original_request) + assert result == completed + decoy.verify( + await state_store.wait_for_not( + state_store.commands.get_recovery_in_progress_for_command, + "command-id", + ) + ) + + def test_play( decoy: Decoy, state_store: StateStore, @@ -764,6 +858,8 @@ async def test_estop_during_command( """It should be able to stop the engine.""" timestamp = datetime(2021, 1, 1, 0, 0) command_id = "command_fake_id" + running_command = sentinel.running_command + queued_command = sentinel.queued_command error_id = "fake_error_id" fake_command_set = OrderedSet(["fake-id-1", "fake-id-1"]) @@ -771,10 +867,15 @@ async def test_estop_during_command( decoy.when(model_utils.generate_id()).then_return(error_id) decoy.when(state_store.commands.get_is_stopped()).then_return(False) decoy.when(state_store.commands.get_running_command_id()).then_return(command_id) + decoy.when(state_store.commands.get(command_id)).then_return(running_command) decoy.when(state_store.commands.get_queue_ids()).then_return(fake_command_set) + decoy.when(state_store.commands.get(fake_command_set.head())).then_return( + queued_command + ) expected_action = FailCommandAction( command_id=command_id, + running_command=running_command, error_id=error_id, failed_at=timestamp, error=EStopActivatedError(message="Estop Activated"), @@ -783,6 +884,7 @@ async def test_estop_during_command( ) expected_action_2 = FailCommandAction( command_id=fake_command_set.head(), + running_command=queued_command, error_id=error_id, failed_at=timestamp, error=EStopActivatedError(message="Estop Activated"), diff --git a/api/tests/opentrons/protocol_runner/test_legacy_command_mapper.py b/api/tests/opentrons/protocol_runner/test_legacy_command_mapper.py index 23b7ecac3bb..f0412878856 100644 --- a/api/tests/opentrons/protocol_runner/test_legacy_command_mapper.py +++ b/api/tests/opentrons/protocol_runner/test_legacy_command_mapper.py @@ -156,6 +156,7 @@ def test_map_after_with_error_command() -> None: assert result == [ pe_actions.FailCommandAction( command_id="command.COMMENT-0", + running_command=matchers.Anything(), error_id=matchers.IsA(str), failed_at=matchers.IsA(datetime), error=matchers.ErrorMatching( @@ -257,6 +258,7 @@ def test_command_stack() -> None: ), pe_actions.FailCommandAction( command_id="command.COMMENT-1", + running_command=matchers.Anything(), error_id=matchers.IsA(str), failed_at=matchers.IsA(datetime), error=matchers.ErrorMatching(LegacyContextCommandError, "oh no"),