Skip to content

Commit

Permalink
double write run status change events to sqlite index shard (#16814)
Browse files Browse the repository at this point in the history
## Summary & Motivation
We want to move away from custom cursor objects (e.g.
`RunShardedEventsCursor`) to using uniform storage ids.

This allows for a cleaner public API for new storage methods, while
still being supported by run-sharded storages like sqlite.
 
## How I Tested These Changes
BK
  • Loading branch information
prha authored Sep 27, 2023
1 parent 9960687 commit 15fabaf
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from dagster._core.definitions.events import AssetKey
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.event_api import EventHandlerFn
from dagster._core.events import ASSET_CHECK_EVENTS, ASSET_EVENTS
from dagster._core.events import ASSET_CHECK_EVENTS, ASSET_EVENTS, EVENT_TYPE_TO_PIPELINE_RUN_STATUS
from dagster._core.events.log import EventLogEntry
from dagster._core.storage.dagster_run import DagsterRunStatus, RunsFilter
from dagster._core.storage.event_log.base import EventLogCursor, EventLogRecord, EventRecordsFilter
Expand Down Expand Up @@ -263,6 +263,11 @@ def store_event(self, event: EventLogEntry) -> None:
if event.is_dagster_event and event.dagster_event_type in ASSET_CHECK_EVENTS:
self.store_asset_check_event(event, None)

if event.is_dagster_event and event.dagster_event_type in EVENT_TYPE_TO_PIPELINE_RUN_STATUS:
# should mirror run status change events in the index shard
with self.index_connection() as conn:
result = conn.execute(insert_event_statement)

def get_event_records(
self,
event_records_filter: EventRecordsFilter,
Expand All @@ -280,8 +285,8 @@ def get_event_records(

is_asset_query = event_records_filter and event_records_filter.event_type in ASSET_EVENTS
if is_asset_query:
# asset materializations, observations and materialization planned events
# get mirrored into the index shard, so no custom run shard-aware cursor logic needed
# asset materializations, observations and materialization planned events get mirrored
# into the index shard, so no custom run shard-aware cursor logic needed
return super(SqliteEventLogStorage, self).get_event_records(
event_records_filter=event_records_filter, limit=limit, ascending=ascending
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,9 @@ def from_config_value(
def _instance(self) -> Optional["DagsterInstance"]:
return self._storage._instance # noqa: SLF001

def index_connection(self):
return self._storage.event_log_storage.index_connection()

def register_instance(self, instance: "DagsterInstance") -> None:
if not self._storage.has_instance:
self._storage.register_instance(instance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,8 +964,8 @@ def two_asset_sensor(context):
assert sensor_data.run_requests[0].tags["dagster/partition"] == "2022-08-01"
assert (
ctx.cursor
== '{"AssetKey([\'daily_partitions_asset\'])": ["2022-08-01", 3, {}],'
' "AssetKey([\'daily_partitions_asset_2\'])": ["2022-08-01", 4, {}]}'
== '{"AssetKey([\'daily_partitions_asset\'])": ["2022-08-01", 4, {}],'
' "AssetKey([\'daily_partitions_asset_2\'])": ["2022-08-01", 5, {}]}'
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ def mocked_get_input_data_version_tag(
assert extract_data_provenance_from_entry(record.event_log_entry) == DataProvenance(
code_version="1",
input_data_versions={AssetKey(["foo"]): DataVersion("alpha")},
input_storage_ids={AssetKey(["foo"]): 3},
input_storage_ids={AssetKey(["foo"]): 4},
is_user_provided=True,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from dagster._core.definitions.unresolved_asset_job_definition import define_asset_job
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._core.events import (
EVENT_TYPE_TO_PIPELINE_RUN_STATUS,
AssetMaterializationPlannedData,
AssetObservationData,
DagsterEvent,
Expand Down Expand Up @@ -77,8 +78,10 @@
EVENT_LOG_DATA_MIGRATIONS,
migrate_asset_key_data,
)
from dagster._core.storage.event_log.schema import SqlEventLogStorageTable
from dagster._core.storage.event_log.sqlite.sqlite_event_log import SqliteEventLogStorage
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue
from dagster._core.storage.sqlalchemy_compat import db_select
from dagster._core.test_utils import create_run_for_test, instance_for_test
from dagster._core.types.loadable_target_origin import LoadableTargetOrigin
from dagster._core.utils import make_new_run_id
Expand Down Expand Up @@ -1496,6 +1499,20 @@ def a_job():
),
)

# check that events were double-written to the index shard
with storage.index_connection() as conn:
run_status_change_events = conn.execute(
db_select([SqlEventLogStorageTable.c.id]).where(
SqlEventLogStorageTable.c.dagster_event_type.in_(
[
event_type.value
for event_type in EVENT_TYPE_TO_PIPELINE_RUN_STATUS.keys()
]
)
)
).fetchall()
assert len(run_status_change_events) == 6

# .watch() is async, there's a small chance they don't run before the asserts
@pytest.mark.flaky(reruns=1)
def test_watch_exc_recovery(self, storage):
Expand Down

0 comments on commit 15fabaf

Please sign in to comment.