Skip to content

Commit

Permalink
Improved crash and error recovery in the asset daemon (#17344)
Browse files Browse the repository at this point in the history
Summary:
This PR adds logic to the asset daemon to ensure that if it crashes or
raises an exception in the middle of a tick, subsequent ticks resume
where they left off without launching duplicate runs.

It allows us to submit runs one by one as they are computed, rather than
computing the snapshots for every single run in the tick one by one
before launching them (in case there is a failure partway through).

The way that we do this safely is by:
- first computing the evaluations, getting a bunch of run requests out
- Storing those run requests on the tick, along with a bunch of reserved
run IDs
- Before each run is launched, double check that is wasn't already
launched
- Write asset evaluations before the runs are launched (without the run
IDs) and afterwards again (including the run IDs) - so that we don't
lose evaluations either if there are crashes.

To account for crashing partway through, the daemon first looks at the
state of the most recent tick. If it crashed (i.e. it is STARTED) or
raised an exception (it is FAILED), it pulls the in-progress run
requests and cursor off of that tick and picks up where it left off.

The error case only retries a certain number of times before writing the
cursor anyway and moving on (this is a change from the current behavior,
but allows a certain amount of transient errors before we move on)

Much like with the scheduler, a "UserCodeUnreachableError" (i.e a code
server is down, or the Dagster Cloud agent is down) causes ticks to
retry indefinitely until the code is available again.

Still a bit more testing to add here, but I figured it was ready for any
initial feedabck.

## Summary & Motivation

## How I Tested These Changes
  • Loading branch information
gibsondan authored Nov 1, 2023
1 parent 926b209 commit d4877c4
Show file tree
Hide file tree
Showing 7 changed files with 1,008 additions and 315 deletions.
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,10 @@ def auto_materialize_respect_materialization_data_versions(self) -> bool:
"respect_materialization_data_versions", False
)

@property
def auto_materialize_max_tick_retries(self) -> int:
return self.get_settings("auto_materialize").get("max_tick_retries", 3)

# python logs

@property
Expand Down
18 changes: 17 additions & 1 deletion python_modules/dagster/dagster/_core/instance/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@
Bool,
_check as check,
)
from dagster._config import Field, Permissive, ScalarUnion, Selector, StringSource, validate_config
from dagster._config import (
Field,
IntSource,
Permissive,
ScalarUnion,
Selector,
StringSource,
validate_config,
)
from dagster._core.errors import DagsterInvalidConfigError
from dagster._core.storage.config import mysql_config, pg_config
from dagster._serdes import class_from_code_pointer
Expand Down Expand Up @@ -359,6 +367,14 @@ def dagster_instance_config_schema() -> Mapping[str, Field]:
"minimum_interval_seconds": Field(int, is_required=False),
"run_tags": Field(dict, is_required=False),
"respect_materialization_data_versions": Field(Bool, is_required=False),
"max_tick_retries": Field(
IntSource,
default_value=3,
is_required=False,
description=(
"For each auto-materialize tick that raises an error, how many times to retry that tick"
),
),
}
),
}
53 changes: 34 additions & 19 deletions python_modules/dagster/dagster/_core/scheduler/instigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,13 @@ def __new__(cls, tick_id: int, tick_data: "TickData"):
def with_status(self, status: TickStatus, **kwargs: Any):
check.inst_param(status, "status", TickStatus)
end_timestamp = pendulum.now("UTC").timestamp() if status != TickStatus.STARTED else None
return self._replace(
tick_data=self.tick_data.with_status(status, end_timestamp=end_timestamp, **kwargs)
)
kwargs["end_timestamp"] = end_timestamp
return self._replace(tick_data=self.tick_data.with_status(status, **kwargs))

def with_run_requests(self, run_requests: Sequence[RunRequest]) -> "InstigatorTick":
return self._replace(tick_data=self.tick_data.with_run_requests(run_requests))
def with_run_requests(
self, run_requests: Sequence[RunRequest], **kwargs: Any
) -> "InstigatorTick":
return self._replace(tick_data=self.tick_data.with_run_requests(run_requests, **kwargs))

def with_reason(self, skip_reason: str) -> "InstigatorTick":
check.opt_str_param(skip_reason, "skip_reason")
Expand Down Expand Up @@ -438,6 +439,10 @@ def requested_asset_keys(self) -> AbstractSet[AssetKey]:

return set(self.requested_assets_and_partitions.keys())

@property
def run_requests(self) -> Optional[Sequence[RunRequest]]:
return self.tick_data.run_requests


@whitelist_for_serdes(
old_storage_names={"JobTickData"},
Expand Down Expand Up @@ -472,6 +477,7 @@ class TickData(
("end_timestamp", Optional[float]), # Time the tick finished
("run_requests", Optional[Sequence[RunRequest]]), # run requests created by the tick
("auto_materialize_evaluation_id", Optional[int]),
("reserved_run_ids", Optional[Sequence[str]]),
],
)
):
Expand All @@ -486,15 +492,25 @@ class TickData(
status (TickStatus): The status of the tick, which can be updated
timestamp (float): The timestamp at which this instigator evaluation started
run_id (str): The run created by the tick.
run_keys (Sequence[str]): Unique user-specified identifiers for the runs created by this
instigator.
error (SerializableErrorInfo): The error caught during execution. This is set only when
the status is ``TickStatus.Failure``
skip_reason (str): message for why the tick was skipped
cursor (Optional[str]): Cursor output by this tick.
origin_run_ids (List[str]): The runs originated from the schedule/sensor.
failure_count (int): The number of times this tick has failed. If the status is not
FAILED, this is the number of previous failures before it reached the current state.
dynamic_partitions_request_results (Sequence[DynamicPartitionsRequestResult]): The results
of the dynamic partitions requests evaluated within the tick.
end_timestamp (Optional[float]) Time that this tick finished.
run_requests (Optional[Sequence[RunRequest]]) The RunRequests that were requested by this
tick. Currently only used by the AUTO_MATERIALIZE type.
auto_materialize_evaluation_id (Optinoal[int]) For AUTO_MATERIALIZE ticks, the evaluation ID
that can be used to index into the asset_daemon_asset_evaluations table.
reserved_run_ids (Optional[Sequence[str]]): A list of run IDs to use for each of the
run_requests. Used to ensure that if the tick fails partway through, we don't create
any duplicate runs for the tick. Currently only used by AUTO_MATERIALIZE ticks.
"""

def __new__(
Expand All @@ -519,6 +535,7 @@ def __new__(
end_timestamp: Optional[float] = None,
run_requests: Optional[Sequence[RunRequest]] = None,
auto_materialize_evaluation_id: Optional[int] = None,
reserved_run_ids: Optional[Sequence[str]] = None,
):
_validate_tick_args(instigator_type, status, run_ids, error, skip_reason)
check.opt_list_param(log_key, "log_key", of_type=str)
Expand Down Expand Up @@ -546,30 +563,21 @@ def __new__(
end_timestamp=end_timestamp,
run_requests=check.opt_sequence_param(run_requests, "run_requests"),
auto_materialize_evaluation_id=auto_materialize_evaluation_id,
reserved_run_ids=check.opt_sequence_param(reserved_run_ids, "reserved_run_ids"),
)

def with_status(
self,
status: TickStatus,
error: Optional[SerializableErrorInfo] = None,
timestamp: Optional[float] = None,
failure_count: Optional[int] = None,
end_timestamp: Optional[float] = None,
**kwargs,
) -> "TickData":
return TickData(
**merge_dicts(
self._asdict(),
{
"status": status,
"error": error,
"timestamp": timestamp if timestamp is not None else self.timestamp,
"failure_count": (
failure_count if failure_count is not None else self.failure_count
),
"end_timestamp": (
end_timestamp if end_timestamp is not None else self.end_timestamp
),
},
kwargs,
)
)

Expand All @@ -596,12 +604,19 @@ def with_run_info(
)
)

def with_run_requests(self, run_requests: Sequence[RunRequest]) -> "TickData":
def with_run_requests(
self,
run_requests: Sequence[RunRequest],
reserved_run_ids: Optional[Sequence[str]] = None,
cursor: Optional[str] = None,
) -> "TickData":
return TickData(
**merge_dicts(
self._asdict(),
{
"run_requests": run_requests,
"reserved_run_ids": reserved_run_ids,
"cursor": cursor,
},
)
)
Expand Down
Loading

0 comments on commit d4877c4

Please sign in to comment.