Skip to content

Commit

Permalink
Refactor event log storage tests to use instance
Browse files Browse the repository at this point in the history
Summary:
Allows us to add test cases to TestEventLogStorage that use the instance directly.

Test Plan: BK
  • Loading branch information
gibsondan committed Aug 9, 2024
1 parent 176c2a7 commit 6499e97
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import pytest
import sqlalchemy
import sqlalchemy as db
from dagster import DagsterInstance
from dagster._core.errors import DagsterEventLogInvalidForRun
from dagster._core.storage.event_log import (
ConsolidatedSqliteEventLogStorage,
InMemoryEventLogStorage,
SqlEventLogStorageMetadata,
SqlEventLogStorageTable,
SqliteEventLogStorage,
Expand All @@ -22,6 +22,7 @@
from dagster._core.storage.sql import create_engine
from dagster._core.storage.sqlalchemy_compat import db_select
from dagster._core.storage.sqlite_storage import DagsterSqliteStorage
from dagster._core.test_utils import instance_for_test
from dagster._core.utils import make_new_run_id
from dagster._utils.test import ConcurrencyEnabledSqliteTestEventLogStorage
from sqlalchemy import __version__ as sqlalchemy_version
Expand All @@ -34,12 +35,13 @@ class TestInMemoryEventLogStorage(TestEventLogStorage):
__test__ = True

@pytest.fixture(scope="function", name="storage")
def event_log_storage(self):
storage = InMemoryEventLogStorage()
try:
yield storage
finally:
storage.dispose()
def event_log_storage(self, instance):
yield instance.event_log_storage

@pytest.fixture(name="instance", scope="function")
def instance(self):
with DagsterInstance.ephemeral() as the_instance:
yield the_instance

@pytest.mark.skipif(
sys.version_info >= (3, 12) and sqlalchemy_version.startswith("1.4."),
Expand All @@ -52,16 +54,17 @@ def test_basic_get_logs_for_run_multiple_runs_cursors(self, instance, storage):
class TestSqliteEventLogStorage(TestEventLogStorage):
__test__ = True

@pytest.fixture(scope="function", name="storage")
def event_log_storage(self):
# make the temp dir in the cwd since default temp roots
# have issues with FS notif based event log watching
@pytest.fixture(name="instance", scope="function")
def instance(self):
with tempfile.TemporaryDirectory(dir=os.getcwd()) as tmpdir_path:
storage = SqliteEventLogStorage(tmpdir_path)
try:
yield storage
finally:
storage.dispose()
with instance_for_test(temp_dir=tmpdir_path) as instance:
yield instance

@pytest.fixture(scope="function", name="storage")
def event_log_storage(self, instance):
event_log_storage = instance.event_log_storage
assert isinstance(event_log_storage, SqliteEventLogStorage)
yield instance.event_log_storage

def test_filesystem_event_log_storage_run_corrupted(self, storage):
# URL begins sqlite:///
Expand Down Expand Up @@ -134,35 +137,47 @@ def test_concurrent_sqlite_event_log_connections(self, storage):
class TestConsolidatedSqliteEventLogStorage(TestEventLogStorage):
__test__ = True

@pytest.fixture(scope="function", name="storage")
def event_log_storage(self):
# make the temp dir in the cwd since default temp roots
# have issues with FS notif based event log watching
@pytest.fixture(name="instance", scope="function")
def instance(self):
with tempfile.TemporaryDirectory(dir=os.getcwd()) as tmpdir_path:
storage = ConsolidatedSqliteEventLogStorage(tmpdir_path)
try:
yield storage
finally:
storage.dispose()
with instance_for_test(
temp_dir=tmpdir_path,
overrides={
"event_log_storage": {
"module": "dagster.core.storage.event_log",
"class": "ConsolidatedSqliteEventLogStorage",
"config": {"base_dir": tmpdir_path},
}
},
) as instance:
yield instance

@pytest.fixture(scope="function", name="storage")
def event_log_storage(self, instance):
event_log_storage = instance.event_log_storage
assert isinstance(event_log_storage, ConsolidatedSqliteEventLogStorage)
yield event_log_storage


class TestLegacyStorage(TestEventLogStorage):
__test__ = True

@pytest.fixture(scope="function", name="storage")
def event_log_storage(self):
# make the temp dir in the cwd since default temp roots
# have issues with FS notif based event log watching
@pytest.fixture(name="instance", scope="function")
def instance(self):
with tempfile.TemporaryDirectory(dir=os.getcwd()) as tmpdir_path:
# first create the unified storage class
storage = DagsterSqliteStorage.from_local(tmpdir_path)
# next create the legacy adapter class
legacy_storage = LegacyEventLogStorage(storage)
try:
yield legacy_storage
finally:
legacy_storage.dispose()
storage.dispose()
with instance_for_test(temp_dir=tmpdir_path) as instance:
yield instance

@pytest.fixture(scope="function", name="storage")
def event_log_storage(self, instance):
storage = instance.get_ref().storage
assert isinstance(storage, DagsterSqliteStorage)
legacy_storage = LegacyEventLogStorage(storage)
legacy_storage.register_instance(instance)
try:
yield legacy_storage
finally:
legacy_storage.dispose()

def is_sqlite(self, storage):
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1999,7 +1999,7 @@ def _get_storage_ids(result):
)
assert _get_storage_ids(result) == [storage_id_3, storage_id_1]

def test_get_event_records_sqlite(self, storage):
def test_get_event_records_sqlite(self, storage, instance):
if not self.is_sqlite(storage):
pytest.skip()

Expand Down Expand Up @@ -2032,119 +2032,109 @@ def materialize_one(_):
def a_job():
materialize_one()

with instance_for_test() as instance:
if not storage.has_instance:
storage.register_instance(instance)

# first run
execute_run(
InMemoryJob(a_job),
instance.create_run_for_job(
a_job,
run_id=run_id_1,
run_config={"loggers": {"callback": {}, "console": {}}},
),
instance,
)
instance.wipe()

for event in events:
storage.store_event(event)

run_records = instance.get_run_records()
assert len(run_records) == 1
# first run
execute_run(
InMemoryJob(a_job),
instance.create_run_for_job(
a_job,
run_id=run_id_1,
run_config={"loggers": {"callback": {}, "console": {}}},
),
instance,
)

# second run
events = []
execute_run(
InMemoryJob(a_job),
instance.create_run_for_job(
a_job,
run_id=run_id_2,
run_config={"loggers": {"callback": {}, "console": {}}},
),
instance,
)
run_records = instance.get_run_records()
assert len(run_records) == 2
for event in events:
storage.store_event(event)
run_records = instance.get_run_records()
assert len(run_records) == 1

# third run
events = []
execute_run(
InMemoryJob(a_job),
instance.create_run_for_job(
a_job,
run_id=run_id_3,
run_config={"loggers": {"callback": {}, "console": {}}},
),
instance,
)
run_records = instance.get_run_records()
assert len(run_records) == 3
for event in events:
storage.store_event(event)
# second run
events = []
execute_run(
InMemoryJob(a_job),
instance.create_run_for_job(
a_job,
run_id=run_id_2,
run_config={"loggers": {"callback": {}, "console": {}}},
),
instance,
)
run_records = instance.get_run_records()
assert len(run_records) == 2

update_timestamp = run_records[-1].update_timestamp
# third run
events = []
execute_run(
InMemoryJob(a_job),
instance.create_run_for_job(
a_job,
run_id=run_id_3,
run_config={"loggers": {"callback": {}, "console": {}}},
),
instance,
)
run_records = instance.get_run_records()
assert len(run_records) == 3

update_timestamp = run_records[-1].update_timestamp

# use tz-aware cursor
filtered_records = storage.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.RUN_SUCCESS,
after_cursor=RunShardedEventsCursor(
id=0, run_updated_after=update_timestamp
), # events after first run
),
ascending=True,
)

# use tz-aware cursor
filtered_records = storage.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.RUN_SUCCESS,
after_cursor=RunShardedEventsCursor(
id=0, run_updated_after=update_timestamp
), # events after first run
),
ascending=True,
)
assert len(filtered_records) == 2
assert _event_types([r.event_log_entry for r in filtered_records]) == [
DagsterEventType.RUN_SUCCESS,
DagsterEventType.RUN_SUCCESS,
]
assert [r.event_log_entry.run_id for r in filtered_records] == [run_id_2, run_id_3]
assert len(filtered_records) == 2
assert _event_types([r.event_log_entry for r in filtered_records]) == [
DagsterEventType.RUN_SUCCESS,
DagsterEventType.RUN_SUCCESS,
]
assert [r.event_log_entry.run_id for r in filtered_records] == [run_id_2, run_id_3]

# use tz-naive cursor
filtered_records = storage.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.RUN_SUCCESS,
after_cursor=RunShardedEventsCursor(
id=0, run_updated_after=update_timestamp.replace(tzinfo=None)
), # events after first run
),
ascending=True,
)
assert len(filtered_records) == 2
assert _event_types([r.event_log_entry for r in filtered_records]) == [
DagsterEventType.RUN_SUCCESS,
DagsterEventType.RUN_SUCCESS,
]
assert [r.event_log_entry.run_id for r in filtered_records] == [run_id_2, run_id_3]

# use tz-naive cursor
filtered_records = storage.get_event_records(
# use invalid cursor
with pytest.raises(Exception, match="Add a RunShardedEventsCursor to your query filter"):
storage.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.RUN_SUCCESS,
after_cursor=RunShardedEventsCursor(
id=0, run_updated_after=update_timestamp.replace(tzinfo=None)
), # events after first run
after_cursor=0,
),
ascending=True,
)
assert len(filtered_records) == 2
assert _event_types([r.event_log_entry for r in filtered_records]) == [
DagsterEventType.RUN_SUCCESS,
DagsterEventType.RUN_SUCCESS,
]
assert [r.event_log_entry.run_id for r in filtered_records] == [run_id_2, run_id_3]

# use invalid cursor
with pytest.raises(
Exception, match="Add a RunShardedEventsCursor to your query filter"
):
storage.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.RUN_SUCCESS,
after_cursor=0,
),
)

# check that events were double-written to the index shard
with storage.index_connection() as conn:
run_status_change_events = conn.execute(
db_select([SqlEventLogStorageTable.c.id]).where(
SqlEventLogStorageTable.c.dagster_event_type.in_(
[
event_type.value
for event_type in EVENT_TYPE_TO_PIPELINE_RUN_STATUS.keys()
]
)
# check that events were double-written to the index shard
with storage.index_connection() as conn:
run_status_change_events = conn.execute(
db_select([SqlEventLogStorageTable.c.id]).where(
SqlEventLogStorageTable.c.dagster_event_type.in_(
[
event_type.value
for event_type in EVENT_TYPE_TO_PIPELINE_RUN_STATUS.keys()
]
)
).fetchall()
assert len(run_status_change_events) == 6
)
).fetchall()
assert len(run_status_change_events) == 6

# .watch() is async, there's a small chance they don't run before the asserts
@pytest.mark.flaky(reruns=1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,20 @@ def _clean_storage(conn_string):
class TestMySQLEventLogStorage(TestEventLogStorage):
__test__ = True

@pytest.fixture(name="instance", scope="function")
def instance(self, conn_string):
MySQLEventLogStorage.create_clean_storage(conn_string)

with instance_for_test(
overrides={"storage": {"mysql": {"mysql_url": conn_string}}}
) as instance:
yield instance

@pytest.fixture(scope="function", name="storage")
def event_log_storage(self, conn_string):
with _clean_storage(conn_string) as storage:
yield storage
def event_log_storage(self, instance):
event_log_storage = instance.event_log_storage
assert isinstance(event_log_storage, MySQLEventLogStorage)
yield event_log_storage

def test_event_log_storage_two_watchers(self, conn_string):
with _clean_storage(conn_string) as storage:
Expand Down
Loading

0 comments on commit 6499e97

Please sign in to comment.