From 15fabaf9fd9ec6df8bab8043cf6997de6b0f99ea Mon Sep 17 00:00:00 2001 From: prha <1040172+prha@users.noreply.github.com> Date: Wed, 27 Sep 2023 15:28:52 -0700 Subject: [PATCH] double write run status change events to sqlite index shard (#16814) ## Summary & Motivation We want to move away from custom cursor objects (e.g. `RunShardedEventsCursor`) to using uniform storage ids. This allows for a cleaner public API for new storage methods, while still being supported by run-sharded storages like sqlite. ## How I Tested These Changes BK --- .../event_log/sqlite/sqlite_event_log.py | 11 ++++++++--- .../dagster/_core/storage/legacy_storage.py | 3 +++ .../definitions_tests/test_sensor_invocation.py | 4 ++-- .../execution_tests/test_data_versions.py | 2 +- .../storage_tests/utils/event_log_storage.py | 17 +++++++++++++++++ 5 files changed, 31 insertions(+), 6 deletions(-) diff --git a/python_modules/dagster/dagster/_core/storage/event_log/sqlite/sqlite_event_log.py b/python_modules/dagster/dagster/_core/storage/event_log/sqlite/sqlite_event_log.py index 1289cd6f3a285..d266d5fd284af 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/sqlite/sqlite_event_log.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/sqlite/sqlite_event_log.py @@ -25,7 +25,7 @@ from dagster._core.definitions.events import AssetKey from dagster._core.errors import DagsterInvariantViolationError from dagster._core.event_api import EventHandlerFn -from dagster._core.events import ASSET_CHECK_EVENTS, ASSET_EVENTS +from dagster._core.events import ASSET_CHECK_EVENTS, ASSET_EVENTS, EVENT_TYPE_TO_PIPELINE_RUN_STATUS from dagster._core.events.log import EventLogEntry from dagster._core.storage.dagster_run import DagsterRunStatus, RunsFilter from dagster._core.storage.event_log.base import EventLogCursor, EventLogRecord, EventRecordsFilter @@ -263,6 +263,11 @@ def store_event(self, event: EventLogEntry) -> None: if event.is_dagster_event and event.dagster_event_type in ASSET_CHECK_EVENTS: self.store_asset_check_event(event, None) + if event.is_dagster_event and event.dagster_event_type in EVENT_TYPE_TO_PIPELINE_RUN_STATUS: + # should mirror run status change events in the index shard + with self.index_connection() as conn: + result = conn.execute(insert_event_statement) + def get_event_records( self, event_records_filter: EventRecordsFilter, @@ -280,8 +285,8 @@ def get_event_records( is_asset_query = event_records_filter and event_records_filter.event_type in ASSET_EVENTS if is_asset_query: - # asset materializations, observations and materialization planned events - # get mirrored into the index shard, so no custom run shard-aware cursor logic needed + # asset materializations, observations and materialization planned events get mirrored + # into the index shard, so no custom run shard-aware cursor logic needed return super(SqliteEventLogStorage, self).get_event_records( event_records_filter=event_records_filter, limit=limit, ascending=ascending ) diff --git a/python_modules/dagster/dagster/_core/storage/legacy_storage.py b/python_modules/dagster/dagster/_core/storage/legacy_storage.py index 9220b8d349f03..9402303bf7a8f 100644 --- a/python_modules/dagster/dagster/_core/storage/legacy_storage.py +++ b/python_modules/dagster/dagster/_core/storage/legacy_storage.py @@ -376,6 +376,9 @@ def from_config_value( def _instance(self) -> Optional["DagsterInstance"]: return self._storage._instance # noqa: SLF001 + def index_connection(self): + return self._storage.event_log_storage.index_connection() + def register_instance(self, instance: "DagsterInstance") -> None: if not self._storage.has_instance: self._storage.register_instance(instance) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_sensor_invocation.py b/python_modules/dagster/dagster_tests/definitions_tests/test_sensor_invocation.py index 4c6a4d87bf1e0..b7bcea01436d2 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_sensor_invocation.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_sensor_invocation.py @@ -964,8 +964,8 @@ def two_asset_sensor(context): assert sensor_data.run_requests[0].tags["dagster/partition"] == "2022-08-01" assert ( ctx.cursor - == '{"AssetKey([\'daily_partitions_asset\'])": ["2022-08-01", 3, {}],' - ' "AssetKey([\'daily_partitions_asset_2\'])": ["2022-08-01", 4, {}]}' + == '{"AssetKey([\'daily_partitions_asset\'])": ["2022-08-01", 4, {}],' + ' "AssetKey([\'daily_partitions_asset_2\'])": ["2022-08-01", 5, {}]}' ) diff --git a/python_modules/dagster/dagster_tests/execution_tests/test_data_versions.py b/python_modules/dagster/dagster_tests/execution_tests/test_data_versions.py index 1a39218457a3d..e5616c23bb623 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/test_data_versions.py +++ b/python_modules/dagster/dagster_tests/execution_tests/test_data_versions.py @@ -979,7 +979,7 @@ def mocked_get_input_data_version_tag( assert extract_data_provenance_from_entry(record.event_log_entry) == DataProvenance( code_version="1", input_data_versions={AssetKey(["foo"]): DataVersion("alpha")}, - input_storage_ids={AssetKey(["foo"]): 3}, + input_storage_ids={AssetKey(["foo"]): 4}, is_user_provided=True, ) diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py index 0c554071b1130..c65bd855755da 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py @@ -48,6 +48,7 @@ from dagster._core.definitions.unresolved_asset_job_definition import define_asset_job from dagster._core.errors import DagsterInvalidInvocationError from dagster._core.events import ( + EVENT_TYPE_TO_PIPELINE_RUN_STATUS, AssetMaterializationPlannedData, AssetObservationData, DagsterEvent, @@ -77,8 +78,10 @@ EVENT_LOG_DATA_MIGRATIONS, migrate_asset_key_data, ) +from dagster._core.storage.event_log.schema import SqlEventLogStorageTable from dagster._core.storage.event_log.sqlite.sqlite_event_log import SqliteEventLogStorage from dagster._core.storage.partition_status_cache import AssetStatusCacheValue +from dagster._core.storage.sqlalchemy_compat import db_select from dagster._core.test_utils import create_run_for_test, instance_for_test from dagster._core.types.loadable_target_origin import LoadableTargetOrigin from dagster._core.utils import make_new_run_id @@ -1496,6 +1499,20 @@ def a_job(): ), ) + # check that events were double-written to the index shard + with storage.index_connection() as conn: + run_status_change_events = conn.execute( + db_select([SqlEventLogStorageTable.c.id]).where( + SqlEventLogStorageTable.c.dagster_event_type.in_( + [ + event_type.value + for event_type in EVENT_TYPE_TO_PIPELINE_RUN_STATUS.keys() + ] + ) + ) + ).fetchall() + assert len(run_status_change_events) == 6 + # .watch() is async, there's a small chance they don't run before the asserts @pytest.mark.flaky(reruns=1) def test_watch_exc_recovery(self, storage):