Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor time-based tests to use fake and sharp loop time #881

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kopf/_cogs/aiokits/aioenums.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def is_set(self, reason: Optional[FlagReasonT] = None) -> bool:

def set(self, reason: Optional[FlagReasonT] = None) -> None:
reason = reason if reason is not None else self.reason # to keep existing values
self.when = self.when if self.when is not None else time.monotonic()
self.when = self.when if self.when is not None else asyncio.get_running_loop().time()
self.reason = reason if self.reason is None or reason is None else self.reason | reason
self.sync_event.set()
self.async_event.set() # it is thread-safe: always called in operator's event loop.
Expand Down
2 changes: 1 addition & 1 deletion kopf/_cogs/configs/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from kopf._cogs.structs import bodies, dicts, ids, patches


class ProgressRecord(TypedDict, total=True):
class ProgressRecord(TypedDict, total=False):
""" A single record stored for persistence of a single handler. """
started: Optional[str]
stopped: Optional[str]
Expand Down
104 changes: 67 additions & 37 deletions kopf/_core/actions/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,44 +86,22 @@ class Outcome:
subrefs: Collection[ids.HandlerId] = ()


@dataclasses.dataclass(frozen=True)
class HandlerState:
class HandlerState(Protocol):
"""
A persisted state of a single handler, as stored on the resource's status.

Note the difference: `Outcome` is for in-memory results of handlers,
which is then additionally converted before being storing as a state.
A minimal necessary protocol (interface) of a handler's runtime state.

Active handler states are those used in .done/.delays for the current
handling cycle & the current cause. Passive handler states are those
carried over for logging of counts/extras, and for final state purging,
but not participating in the current handling cycle.
The implementation and detailed fields are in `progression.HandlerState`.
"""
started: Optional[datetime.datetime] = None # None means this information was lost.
stopped: Optional[datetime.datetime] = None # None means it is still running (e.g. delayed).
delayed: Optional[datetime.datetime] = None # None means it is finished (succeeded/failed).
retries: int = 0
success: bool = False
failure: bool = False

@property
def finished(self) -> bool:
return bool(self.success or self.failure)

@property
def sleeping(self) -> bool:
ts = self.delayed
now = datetime.datetime.utcnow()
return not self.finished and ts is not None and ts > now
started: datetime.datetime
retries: int

@property
def awakened(self) -> bool:
return bool(not self.finished and not self.sleeping)
raise NotImplementedError

@property
def runtime(self) -> datetime.timedelta:
now = datetime.datetime.utcnow()
return now - (self.started if self.started else now)
raise NotImplementedError


class State(Mapping[ids.HandlerId, HandlerState]):
Expand Down Expand Up @@ -267,17 +245,18 @@ async def execute_handler_once(
try:
logger.debug(f"{handler} is invoked.")

# Strict checks — contrary to the look-ahead checks below, which are approximate.
# The unforeseen extra time could be added by e.g. operator or cluster downtime.
if handler.timeout is not None and state.runtime.total_seconds() >= handler.timeout:
raise HandlerTimeoutError(f"{handler} has timed out after {state.runtime}.")

if handler.retries is not None and state.retries >= handler.retries:
raise HandlerRetriesError(f"{handler} has exceeded {state.retries} retries.")

result = await invoke_handler(
handler=handler,
cause=cause,
retry=state.retries,
started=state.started or datetime.datetime.utcnow(), # "or" is for type-checking.
started=state.started,
runtime=state.runtime,
settings=settings,
lifecycle=lifecycle, # just a default for the sub-handlers, not used directly.
Expand All @@ -297,11 +276,30 @@ async def execute_handler_once(

# Definitely a temporary error, regardless of the error strictness.
except TemporaryError as e:
logger.error(f"{handler} failed temporarily: {str(e) or repr(e)}")
return Outcome(final=False, exception=e, delay=e.delay, subrefs=subrefs)
# Maybe false-negative but never false-positive checks to save extra cycles & time wasted.
lookahead_runtime = state.runtime.total_seconds() + (e.delay or 0)
lookahead_timeout = handler.timeout is not None and lookahead_runtime >= handler.timeout
lookahead_retries = handler.retries is not None and state.retries + 1 >= handler.retries
if lookahead_timeout:
msg = (
f"{handler} failed temporarily but will time out after {handler.timeout} seconds: "
f"{str(e) or repr(e)}"
)
logger.error(msg)
return Outcome(final=True, exception=HandlerTimeoutError(msg), subrefs=subrefs)
elif lookahead_retries:
msg = (
f"{handler} failed temporarily but will exceed {handler.retries} retries: "
f"{str(e) or repr(e)}"
)
logger.error(msg)
return Outcome(final=True, exception=HandlerRetriesError(msg), subrefs=subrefs)
else:
logger.error(f"{handler} failed temporarily: {str(e) or repr(e)}")
return Outcome(final=False, exception=e, delay=e.delay, subrefs=subrefs)

# Same as permanent errors below, but with better logging for our internal cases.
except HandlerTimeoutError as e:
except (HandlerTimeoutError, HandlerRetriesError) as e:
logger.error(f"{str(e) or repr(e)}") # already formatted
return Outcome(final=True, exception=e, subrefs=subrefs)
# TODO: report the handling failure somehow (beside logs/events). persistent status?
Expand All @@ -314,14 +312,46 @@ async def execute_handler_once(

# Regular errors behave as either temporary or permanent depending on the error strictness.
except Exception as e:
# Maybe false-negative but never false-positive checks to save extra cycles & time wasted.
lookahead_runtime = state.runtime.total_seconds() + backoff
lookahead_timeout = handler.timeout is not None and lookahead_runtime >= handler.timeout
lookahead_retries = handler.retries is not None and state.retries + 1 >= handler.retries
if errors_mode == ErrorsMode.IGNORED:
logger.exception(f"{handler} failed with an exception. Will ignore.")
msg = (
f"{handler} failed with an exception and will ignore it: "
f"{str(e) or repr(e)}"
)
logger.exception(msg)
return Outcome(final=True, subrefs=subrefs)
elif errors_mode == ErrorsMode.TEMPORARY and lookahead_timeout:
msg = (
f"{handler} failed with an exception and will stop now "
f"(it would time out in {handler.timeout} seconds on the next attempt): "
f"{str(e) or repr(e)}"
)
logger.exception(msg)
return Outcome(final=True, exception=HandlerTimeoutError(msg), subrefs=subrefs)
elif errors_mode == ErrorsMode.TEMPORARY and lookahead_retries:
msg = (
f"{handler} failed with an exception and will stop now "
f"(it would exceed {handler.retries} retries on the next attempt): "
f"{str(e) or repr(e)}"
)
logger.exception(msg)
return Outcome(final=True, exception=HandlerRetriesError(msg), subrefs=subrefs)
elif errors_mode == ErrorsMode.TEMPORARY:
logger.exception(f"{handler} failed with an exception. Will retry.")
msg = (
f"{handler} failed with an exception and will try again in {backoff} seconds: "
f"{str(e) or repr(e)}"
)
logger.exception(msg)
return Outcome(final=False, exception=e, delay=backoff, subrefs=subrefs)
elif errors_mode == ErrorsMode.PERMANENT:
logger.exception(f"{handler} failed with an exception. Will stop.")
msg = (
f"{handler} failed with an exception and will stop now: "
f"{str(e) or repr(e)}"
)
logger.exception(msg)
return Outcome(final=True, exception=e, subrefs=subrefs)
# TODO: report the handling failure somehow (beside logs/events). persistent status?
else:
Expand Down
Loading