From 6499e97d15fd7c3b7b6297c35bb4382d99bf5af7 Mon Sep 17 00:00:00 2001 From: Daniel Gibson Date: Mon, 29 Jul 2024 17:08:29 -0700 Subject: [PATCH] Refactor event log storage tests to use instance Summary: Allows us to add test cases to TestEventLogStorage that use the instance directly. Test Plan: BK --- .../storage_tests/test_event_log.py | 91 ++++---- .../storage_tests/utils/event_log_storage.py | 194 +++++++++--------- .../dagster_mysql_tests/test_event_log.py | 16 +- .../dagster_postgres_tests/test_event_log.py | 16 +- 4 files changed, 171 insertions(+), 146 deletions(-) diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_event_log.py b/python_modules/dagster/dagster_tests/storage_tests/test_event_log.py index 873652737d3a6..a6886b30cdc0e 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_event_log.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_event_log.py @@ -9,10 +9,10 @@ import pytest import sqlalchemy import sqlalchemy as db +from dagster import DagsterInstance from dagster._core.errors import DagsterEventLogInvalidForRun from dagster._core.storage.event_log import ( ConsolidatedSqliteEventLogStorage, - InMemoryEventLogStorage, SqlEventLogStorageMetadata, SqlEventLogStorageTable, SqliteEventLogStorage, @@ -22,6 +22,7 @@ from dagster._core.storage.sql import create_engine from dagster._core.storage.sqlalchemy_compat import db_select from dagster._core.storage.sqlite_storage import DagsterSqliteStorage +from dagster._core.test_utils import instance_for_test from dagster._core.utils import make_new_run_id from dagster._utils.test import ConcurrencyEnabledSqliteTestEventLogStorage from sqlalchemy import __version__ as sqlalchemy_version @@ -34,12 +35,13 @@ class TestInMemoryEventLogStorage(TestEventLogStorage): __test__ = True @pytest.fixture(scope="function", name="storage") - def event_log_storage(self): - storage = InMemoryEventLogStorage() - try: - yield storage - finally: - storage.dispose() + def event_log_storage(self, instance): + yield instance.event_log_storage + + @pytest.fixture(name="instance", scope="function") + def instance(self): + with DagsterInstance.ephemeral() as the_instance: + yield the_instance @pytest.mark.skipif( sys.version_info >= (3, 12) and sqlalchemy_version.startswith("1.4."), @@ -52,16 +54,17 @@ def test_basic_get_logs_for_run_multiple_runs_cursors(self, instance, storage): class TestSqliteEventLogStorage(TestEventLogStorage): __test__ = True - @pytest.fixture(scope="function", name="storage") - def event_log_storage(self): - # make the temp dir in the cwd since default temp roots - # have issues with FS notif based event log watching + @pytest.fixture(name="instance", scope="function") + def instance(self): with tempfile.TemporaryDirectory(dir=os.getcwd()) as tmpdir_path: - storage = SqliteEventLogStorage(tmpdir_path) - try: - yield storage - finally: - storage.dispose() + with instance_for_test(temp_dir=tmpdir_path) as instance: + yield instance + + @pytest.fixture(scope="function", name="storage") + def event_log_storage(self, instance): + event_log_storage = instance.event_log_storage + assert isinstance(event_log_storage, SqliteEventLogStorage) + yield instance.event_log_storage def test_filesystem_event_log_storage_run_corrupted(self, storage): # URL begins sqlite:/// @@ -134,35 +137,47 @@ def test_concurrent_sqlite_event_log_connections(self, storage): class TestConsolidatedSqliteEventLogStorage(TestEventLogStorage): __test__ = True - @pytest.fixture(scope="function", name="storage") - def event_log_storage(self): - # make the temp dir in the cwd since default temp roots - # have issues with FS notif based event log watching + @pytest.fixture(name="instance", scope="function") + def instance(self): with tempfile.TemporaryDirectory(dir=os.getcwd()) as tmpdir_path: - storage = ConsolidatedSqliteEventLogStorage(tmpdir_path) - try: - yield storage - finally: - storage.dispose() + with instance_for_test( + temp_dir=tmpdir_path, + overrides={ + "event_log_storage": { + "module": "dagster.core.storage.event_log", + "class": "ConsolidatedSqliteEventLogStorage", + "config": {"base_dir": tmpdir_path}, + } + }, + ) as instance: + yield instance + + @pytest.fixture(scope="function", name="storage") + def event_log_storage(self, instance): + event_log_storage = instance.event_log_storage + assert isinstance(event_log_storage, ConsolidatedSqliteEventLogStorage) + yield event_log_storage class TestLegacyStorage(TestEventLogStorage): __test__ = True - @pytest.fixture(scope="function", name="storage") - def event_log_storage(self): - # make the temp dir in the cwd since default temp roots - # have issues with FS notif based event log watching + @pytest.fixture(name="instance", scope="function") + def instance(self): with tempfile.TemporaryDirectory(dir=os.getcwd()) as tmpdir_path: - # first create the unified storage class - storage = DagsterSqliteStorage.from_local(tmpdir_path) - # next create the legacy adapter class - legacy_storage = LegacyEventLogStorage(storage) - try: - yield legacy_storage - finally: - legacy_storage.dispose() - storage.dispose() + with instance_for_test(temp_dir=tmpdir_path) as instance: + yield instance + + @pytest.fixture(scope="function", name="storage") + def event_log_storage(self, instance): + storage = instance.get_ref().storage + assert isinstance(storage, DagsterSqliteStorage) + legacy_storage = LegacyEventLogStorage(storage) + legacy_storage.register_instance(instance) + try: + yield legacy_storage + finally: + legacy_storage.dispose() def is_sqlite(self, storage): return True diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py index b69f425a48801..3aae379855b2f 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py @@ -1999,7 +1999,7 @@ def _get_storage_ids(result): ) assert _get_storage_ids(result) == [storage_id_3, storage_id_1] - def test_get_event_records_sqlite(self, storage): + def test_get_event_records_sqlite(self, storage, instance): if not self.is_sqlite(storage): pytest.skip() @@ -2032,119 +2032,109 @@ def materialize_one(_): def a_job(): materialize_one() - with instance_for_test() as instance: - if not storage.has_instance: - storage.register_instance(instance) - - # first run - execute_run( - InMemoryJob(a_job), - instance.create_run_for_job( - a_job, - run_id=run_id_1, - run_config={"loggers": {"callback": {}, "console": {}}}, - ), - instance, - ) + instance.wipe() - for event in events: - storage.store_event(event) - - run_records = instance.get_run_records() - assert len(run_records) == 1 + # first run + execute_run( + InMemoryJob(a_job), + instance.create_run_for_job( + a_job, + run_id=run_id_1, + run_config={"loggers": {"callback": {}, "console": {}}}, + ), + instance, + ) - # second run - events = [] - execute_run( - InMemoryJob(a_job), - instance.create_run_for_job( - a_job, - run_id=run_id_2, - run_config={"loggers": {"callback": {}, "console": {}}}, - ), - instance, - ) - run_records = instance.get_run_records() - assert len(run_records) == 2 - for event in events: - storage.store_event(event) + run_records = instance.get_run_records() + assert len(run_records) == 1 - # third run - events = [] - execute_run( - InMemoryJob(a_job), - instance.create_run_for_job( - a_job, - run_id=run_id_3, - run_config={"loggers": {"callback": {}, "console": {}}}, - ), - instance, - ) - run_records = instance.get_run_records() - assert len(run_records) == 3 - for event in events: - storage.store_event(event) + # second run + events = [] + execute_run( + InMemoryJob(a_job), + instance.create_run_for_job( + a_job, + run_id=run_id_2, + run_config={"loggers": {"callback": {}, "console": {}}}, + ), + instance, + ) + run_records = instance.get_run_records() + assert len(run_records) == 2 - update_timestamp = run_records[-1].update_timestamp + # third run + events = [] + execute_run( + InMemoryJob(a_job), + instance.create_run_for_job( + a_job, + run_id=run_id_3, + run_config={"loggers": {"callback": {}, "console": {}}}, + ), + instance, + ) + run_records = instance.get_run_records() + assert len(run_records) == 3 + + update_timestamp = run_records[-1].update_timestamp + + # use tz-aware cursor + filtered_records = storage.get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.RUN_SUCCESS, + after_cursor=RunShardedEventsCursor( + id=0, run_updated_after=update_timestamp + ), # events after first run + ), + ascending=True, + ) - # use tz-aware cursor - filtered_records = storage.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.RUN_SUCCESS, - after_cursor=RunShardedEventsCursor( - id=0, run_updated_after=update_timestamp - ), # events after first run - ), - ascending=True, - ) - assert len(filtered_records) == 2 - assert _event_types([r.event_log_entry for r in filtered_records]) == [ - DagsterEventType.RUN_SUCCESS, - DagsterEventType.RUN_SUCCESS, - ] - assert [r.event_log_entry.run_id for r in filtered_records] == [run_id_2, run_id_3] + assert len(filtered_records) == 2 + assert _event_types([r.event_log_entry for r in filtered_records]) == [ + DagsterEventType.RUN_SUCCESS, + DagsterEventType.RUN_SUCCESS, + ] + assert [r.event_log_entry.run_id for r in filtered_records] == [run_id_2, run_id_3] + + # use tz-naive cursor + filtered_records = storage.get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.RUN_SUCCESS, + after_cursor=RunShardedEventsCursor( + id=0, run_updated_after=update_timestamp.replace(tzinfo=None) + ), # events after first run + ), + ascending=True, + ) + assert len(filtered_records) == 2 + assert _event_types([r.event_log_entry for r in filtered_records]) == [ + DagsterEventType.RUN_SUCCESS, + DagsterEventType.RUN_SUCCESS, + ] + assert [r.event_log_entry.run_id for r in filtered_records] == [run_id_2, run_id_3] - # use tz-naive cursor - filtered_records = storage.get_event_records( + # use invalid cursor + with pytest.raises(Exception, match="Add a RunShardedEventsCursor to your query filter"): + storage.get_event_records( EventRecordsFilter( event_type=DagsterEventType.RUN_SUCCESS, - after_cursor=RunShardedEventsCursor( - id=0, run_updated_after=update_timestamp.replace(tzinfo=None) - ), # events after first run + after_cursor=0, ), - ascending=True, ) - assert len(filtered_records) == 2 - assert _event_types([r.event_log_entry for r in filtered_records]) == [ - DagsterEventType.RUN_SUCCESS, - DagsterEventType.RUN_SUCCESS, - ] - assert [r.event_log_entry.run_id for r in filtered_records] == [run_id_2, run_id_3] - # use invalid cursor - with pytest.raises( - Exception, match="Add a RunShardedEventsCursor to your query filter" - ): - storage.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.RUN_SUCCESS, - after_cursor=0, - ), - ) - - # check that events were double-written to the index shard - with storage.index_connection() as conn: - run_status_change_events = conn.execute( - db_select([SqlEventLogStorageTable.c.id]).where( - SqlEventLogStorageTable.c.dagster_event_type.in_( - [ - event_type.value - for event_type in EVENT_TYPE_TO_PIPELINE_RUN_STATUS.keys() - ] - ) + # check that events were double-written to the index shard + with storage.index_connection() as conn: + run_status_change_events = conn.execute( + db_select([SqlEventLogStorageTable.c.id]).where( + SqlEventLogStorageTable.c.dagster_event_type.in_( + [ + event_type.value + for event_type in EVENT_TYPE_TO_PIPELINE_RUN_STATUS.keys() + ] ) - ).fetchall() - assert len(run_status_change_events) == 6 + ) + ).fetchall() + assert len(run_status_change_events) == 6 # .watch() is async, there's a small chance they don't run before the asserts @pytest.mark.flaky(reruns=1) diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_event_log.py b/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_event_log.py index 13773b9db52e0..0caf7f6bd1765 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_event_log.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_event_log.py @@ -31,10 +31,20 @@ def _clean_storage(conn_string): class TestMySQLEventLogStorage(TestEventLogStorage): __test__ = True + @pytest.fixture(name="instance", scope="function") + def instance(self, conn_string): + MySQLEventLogStorage.create_clean_storage(conn_string) + + with instance_for_test( + overrides={"storage": {"mysql": {"mysql_url": conn_string}}} + ) as instance: + yield instance + @pytest.fixture(scope="function", name="storage") - def event_log_storage(self, conn_string): - with _clean_storage(conn_string) as storage: - yield storage + def event_log_storage(self, instance): + event_log_storage = instance.event_log_storage + assert isinstance(event_log_storage, MySQLEventLogStorage) + yield event_log_storage def test_event_log_storage_two_watchers(self, conn_string): with _clean_storage(conn_string) as storage: diff --git a/python_modules/libraries/dagster-postgres/dagster_postgres_tests/test_event_log.py b/python_modules/libraries/dagster-postgres/dagster_postgres_tests/test_event_log.py index 6286bb66cc78f..77c52fa915037 100644 --- a/python_modules/libraries/dagster-postgres/dagster_postgres_tests/test_event_log.py +++ b/python_modules/libraries/dagster-postgres/dagster_postgres_tests/test_event_log.py @@ -30,10 +30,20 @@ def _clean_storage(conn_string): class TestPostgresEventLogStorage(TestEventLogStorage): __test__ = True + @pytest.fixture(name="instance", scope="function") + def instance(self, conn_string): + PostgresEventLogStorage.create_clean_storage(conn_string) + + with instance_for_test( + overrides={"storage": {"postgres": {"postgres_url": conn_string}}} + ) as instance: + yield instance + @pytest.fixture(scope="function", name="storage") - def event_log_storage(self, conn_string): - with _clean_storage(conn_string) as storage: - yield storage + def event_log_storage(self, instance): + event_log_storage = instance.event_log_storage + assert isinstance(event_log_storage, PostgresEventLogStorage) + yield event_log_storage def test_event_log_storage_two_watchers(self, conn_string): with _clean_storage(conn_string) as storage: