Skip to content

Commit

Permalink
feat(api): Pause when pick_up_tip() errors in a Python protocol (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
SyntaxColoring authored Apr 9, 2024
1 parent 19d88ce commit 1819b8c
Show file tree
Hide file tree
Showing 20 changed files with 810 additions and 478 deletions.
7 changes: 6 additions & 1 deletion api/src/opentrons/protocol_api/core/engine/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,18 @@ def pick_up_tip(
well_name=well_name,
well_location=well_location,
)
self._engine_client.pick_up_tip(

self._engine_client.pick_up_tip_wait_for_recovery(
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)

# Set the "last location" unconditionally, even if the command failed
# and was recovered from and we don't know if the pipette is physically here.
# This isn't used for path planning, but rather for implicit destination
# selection like in `pipette.aspirate(location=None)`.
self._protocol_core.set_last_location(location=location, mount=self.get_mount())

def drop_tip(
Expand Down
2 changes: 2 additions & 0 deletions api/src/opentrons/protocol_engine/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
PauseAction,
PauseSource,
StopAction,
ResumeFromRecoveryAction,
FinishAction,
HardwareStoppedAction,
QueueCommandAction,
Expand Down Expand Up @@ -38,6 +39,7 @@
"PlayAction",
"PauseAction",
"StopAction",
"ResumeFromRecoveryAction",
"FinishAction",
"HardwareStoppedAction",
"QueueCommandAction",
Expand Down
21 changes: 21 additions & 0 deletions api/src/opentrons/protocol_engine/actions/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,32 @@ class FailCommandAction:
"""

command_id: str
"""The command to fail."""

error_id: str
"""An ID to assign to the command's error.
Must be unique to this occurrence of the error.
"""

failed_at: datetime
"""When the command failed."""

error: EnumeratedError
"""The underlying exception that caused this command to fail."""

notes: List[CommandNote]
"""Overwrite the command's `.notes` with these."""

type: ErrorRecoveryType
"""How this error should be handled in the context of the overall run."""

# This is a quick hack so FailCommandAction handlers can get the params of the
# command that failed. We probably want this to be a new "failure details"
# object instead, similar to how succeeded commands can send a "private result"
# to Protocol Engine internals.
running_command: Command
"""The command to fail, in its prior `running` state."""


@dataclass(frozen=True)
Expand Down
23 changes: 23 additions & 0 deletions api/src/opentrons/protocol_engine/clients/sync_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,29 @@ def pick_up_tip(

return cast(commands.PickUpTipResult, result)

def pick_up_tip_wait_for_recovery(
self,
pipette_id: str,
labware_id: str,
well_name: str,
well_location: WellLocation,
) -> commands.PickUpTip:
"""Execute a PickUpTip, wait for any error recovery, and return it.
Note that the returned command will not necessarily have a `result`.
"""
request = commands.PickUpTipCreate(
params=commands.PickUpTipParams(
pipetteId=pipette_id,
labwareId=labware_id,
wellName=well_name,
wellLocation=well_location,
)
)
command = self._transport.execute_command_wait_for_recovery(request=request)

return cast(commands.PickUpTip, command)

def drop_tip(
self,
pipette_id: str,
Expand Down
115 changes: 96 additions & 19 deletions api/src/opentrons/protocol_engine/clients/transports.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
"""A helper for controlling a `ProtocolEngine` without async/await."""
from asyncio import AbstractEventLoop, run_coroutine_threadsafe
from typing import Any, overload
from typing import Any, Final, overload
from typing_extensions import Literal

from opentrons_shared_data.labware.dev_types import LabwareUri
from opentrons_shared_data.labware.labware_definition import LabwareDefinition


from ..protocol_engine import ProtocolEngine
from ..errors import ProtocolCommandFailedError
from ..error_recovery_policy import ErrorRecoveryType
from ..state import StateView
from ..commands import CommandCreate, CommandResult
from ..commands import Command, CommandCreate, CommandResult, CommandStatus


class RunStoppedBeforeCommandError(RuntimeError):
"""Raised if the ProtocolEngine was stopped before a command could start."""

def __init__(self, command: Command) -> None:
self._command = command
super().__init__(
f"The run was stopped"
f" before {command.commandType} command {command.id} could execute."
)


class ChildThreadTransport:
Expand All @@ -30,16 +43,22 @@ def __init__(self, engine: ProtocolEngine, loop: AbstractEventLoop) -> None:
want to synchronously access it.
loop: The event loop that `engine` is running in (in the other thread).
"""
self._engine = engine
self._loop = loop
# We might access these from different threads,
# so let's make them Final for (shallow) immutability.
self._engine: Final = engine
self._loop: Final = loop

@property
def state(self) -> StateView:
"""Get a view of the Protocol Engine's state."""
return self._engine.state_view

def execute_command(self, request: CommandCreate) -> CommandResult:
"""Execute a ProtocolEngine command, blocking until the command completes.
"""Execute a ProtocolEngine command.
This blocks until the command completes. If the command fails, this will always
raise the failure as an exception--even if ProtocolEngine deemed the failure
recoverable.
Args:
request: The ProtocolEngine command request
Expand All @@ -48,8 +67,11 @@ def execute_command(self, request: CommandCreate) -> CommandResult:
The command's result data.
Raises:
ProtocolEngineError: if the command execution is not successful,
the specific error that cause the command to fail is raised.
ProtocolEngineError: If the command execution was not successful,
the specific error that caused the command to fail is raised.
If the run was stopped before the command could complete, that's
also signaled as this exception.
"""
command = run_coroutine_threadsafe(
self._engine.add_and_execute_command(request=request),
Expand All @@ -64,21 +86,76 @@ def execute_command(self, request: CommandCreate) -> CommandResult:
message=f"{error.errorType}: {error.detail}",
)

# FIXME(mm, 2023-04-10): This assert can easily trigger from this sequence:
#
# 1. The engine is paused.
# 2. The user's Python script calls this method to start a new command,
# which remains `queued` because of the pause.
# 3. The engine is stopped.
#
# The returned command will be `queued`, so it won't have a result.
#
# We need to figure out a proper way to report this condition to callers
# so they correctly interpret it as an intentional stop, not an internal error.
assert command.result is not None, f"Expected Command {command} to have result"
if command.result is None:
# This can happen with a certain pause timing:
#
# 1. The engine is paused.
# 2. The user's Python script calls this method to start a new command,
# which remains `queued` because of the pause.
# 3. The engine is stopped. The returned command will be `queued`
# and won't have a result.
raise RunStoppedBeforeCommandError(command)

return command.result

def execute_command_wait_for_recovery(self, request: CommandCreate) -> Command:
"""Execute a ProtocolEngine command, including error recovery.
This blocks until the command completes. Additionally, if the command fails,
this will continue to block until its error recovery has been completed.
Args:
request: The ProtocolEngine command request.
Returns:
The command. If error recovery happened for it, the command will be
reported here as failed.
Raises:
ProtocolEngineError: If the command failed, *and* the failure was not
recovered from.
If the run was stopped before the command could complete, that's
also signalled as this exception.
"""

async def run_in_pe_thread() -> Command:
command = await self._engine.add_and_execute_command_wait_for_recovery(
request=request
)

if command.error is not None:
error_was_recovered_from = (
self._engine.state_view.commands.get_error_recovery_type(command.id)
== ErrorRecoveryType.WAIT_FOR_RECOVERY
)
if not error_was_recovered_from:
error = command.error
# TODO: this needs to have an actual code
raise ProtocolCommandFailedError(
original_error=error,
message=f"{error.errorType}: {error.detail}",
)

elif command.status == CommandStatus.QUEUED:
# This can happen with a certain pause timing:
#
# 1. The engine is paused.
# 2. The user's Python script calls this method to start a new command,
# which remains `queued` because of the pause.
# 3. The engine is stopped. The returned command will be `queued`,
# and won't have a result.
raise RunStoppedBeforeCommandError(command)

return command

command = run_coroutine_threadsafe(
run_in_pe_thread(),
loop=self._loop,
).result()

return command

@overload
def call_method(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ async def execute(self, command_id: str) -> None:
FailCommandAction(
error=error,
command_id=running_command.id,
running_command=running_command,
error_id=self._model_utils.generate_id(),
failed_at=self._model_utils.get_timestamp(),
notes=note_tracker.get_notes(),
Expand Down
3 changes: 3 additions & 0 deletions api/src/opentrons/protocol_engine/execution/queue_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ async def _run_commands(self) -> None:
command_id = await self._state_store.wait_for(
condition=self._state_store.commands.get_next_to_execute
)
# Assert for type hinting. This is valid because the wait_for() above
# only returns when the value is truthy.
assert command_id is not None
except RunStoppedError:
# There are no more commands that we should execute, either because the run has
# completed on its own, or because a client requested it to stop.
Expand Down
81 changes: 71 additions & 10 deletions api/src/opentrons/protocol_engine/protocol_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,45 @@ async def add_and_execute_command(
the command in state.
Returns:
The command. If the command completed, it will be succeeded or failed.
The command.
If the command completed, it will be succeeded or failed.
If the engine was stopped before it reached the command,
the command will be queued.
"""
command = self.add_command(request)
await self.wait_for_command(command.id)
return self._state_store.commands.get(command.id)

async def add_and_execute_command_wait_for_recovery(
self, request: commands.CommandCreate
) -> commands.Command:
"""Like `add_and_execute_command()`, except wait for error recovery.
Unlike `add_and_execute_command()`, if the command fails, this will not
immediately return the failed command. Instead, if the error is recoverable,
it will wait until error recovery has completed (e.g. when some other task
calls `self.resume_from_recovery()`).
Returns:
The command.
If the command completed, it will be succeeded or failed. If it failed
and then its failure was recovered from, it will still be failed.
If the engine was stopped before it reached the command,
the command will be queued.
"""
queued_command = self.add_command(request)
await self.wait_for_command(command_id=queued_command.id)
completed_command = self._state_store.commands.get(queued_command.id)
await self._state_store.wait_for_not(
self.state_view.commands.get_recovery_in_progress_for_command,
queued_command.id,
)
return completed_command

def estop(
self,
# TODO(mm, 2024-03-26): Maintenance runs are a robot-server concept that
Expand All @@ -251,6 +282,15 @@ def estop(
) -> None:
"""Signal to the engine that an estop event occurred.
If an estop happens while the robot is moving, lower layers physically stop
motion and raise the event as an exception, which fails the Protocol Engine
command. No action from the `ProtocolEngine` caller is needed to handle that.
However, if an estop happens in between commands, or in the middle of
a command like `comment` or `waitForDuration` that doesn't access the hardware,
`ProtocolEngine` needs to be told about it so it can treat it as a fatal run
error and stop executing more commands. This method is how to do that.
If there are any queued commands for the engine, they will be marked
as failed due to the estop event. If there aren't any queued commands
*and* this is a maintenance run (which has commands queued one-by-one),
Expand All @@ -261,15 +301,27 @@ def estop(
"""
if self._state_store.commands.get_is_stopped():
return

current_id = (
running_or_next_queued_id = (
self._state_store.commands.get_running_command_id()
or self._state_store.commands.get_queue_ids().head(None)
# TODO(mm, 2024-04-02): This logic looks wrong whenever the next queued
# command is a setup command, which is the normal case in maintenance
# runs. Setup commands won't show up in commands.get_queue_ids().
)
running_or_next_queued = (
self._state_store.commands.get(running_or_next_queued_id)
if running_or_next_queued_id is not None
else None
)

if current_id is not None:
if running_or_next_queued_id is not None:
assert running_or_next_queued is not None

fail_action = FailCommandAction(
command_id=current_id,
command_id=running_or_next_queued_id,
# FIXME(mm, 2024-04-02): As of https://github.com/Opentrons/opentrons/pull/14726,
# this action is only legal if the command is running, not queued.
running_command=running_or_next_queued,
error_id=self._model_utils.generate_id(),
failed_at=self._model_utils.get_timestamp(),
error=EStopActivatedError(message="Estop Activated"),
Expand All @@ -278,12 +330,21 @@ def estop(
)
self._action_dispatcher.dispatch(fail_action)

# In the case where the running command was a setup command - check if there
# are any pending *run* commands and, if so, clear them all
current_id = self._state_store.commands.get_queue_ids().head(None)
if current_id is not None:
# The FailCommandAction above will have cleared all the queued protocol
# OR setup commands, depending on whether we gave it a protocol or setup
# command. We want both to be cleared in either case. So, do that here.
running_or_next_queued_id = self._state_store.commands.get_queue_ids().head(
None
)
if running_or_next_queued_id is not None:
running_or_next_queued = self._state_store.commands.get(
running_or_next_queued_id
)
fail_action = FailCommandAction(
command_id=current_id,
command_id=running_or_next_queued_id,
# FIXME(mm, 2024-04-02): As of https://github.com/Opentrons/opentrons/pull/14726,
# this action is only legal if the command is running, not queued.
running_command=running_or_next_queued,
error_id=self._model_utils.generate_id(),
failed_at=self._model_utils.get_timestamp(),
error=EStopActivatedError(message="Estop Activated"),
Expand Down
Loading

0 comments on commit 1819b8c

Please sign in to comment.