diff --git a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py index 19b504695c3bd..88528d228637f 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py @@ -878,23 +878,6 @@ def _apply_filter_to_query( isinstance(event_records_filter.asset_key, AssetKey), "Asset key must be set in event records filter to filter by tags.", ) - if self.supports_intersect: - intersections = [ - db_select([AssetEventTagsTable.c.event_id]).where( - db.and_( - AssetEventTagsTable.c.asset_key - == event_records_filter.asset_key.to_string(), # type: ignore # (bad sig?) - AssetEventTagsTable.c.key == key, - ( - AssetEventTagsTable.c.value == value - if isinstance(value, str) - else AssetEventTagsTable.c.value.in_(value) - ), - ) - ) - for key, value in event_records_filter.tags.items() - ] - query = query.where(SqlEventLogStorageTable.c.id.in_(db.intersect(*intersections))) return query @@ -952,11 +935,7 @@ def _get_event_records( else: asset_details = None - if ( - event_records_filter.tags - and not self.supports_intersect - and self.has_table(AssetEventTagsTable.name) - ): + if event_records_filter.tags and self.has_table(AssetEventTagsTable.name): table = self._apply_tags_table_joins( SqlEventLogStorageTable, event_records_filter.tags, event_records_filter.asset_key ) @@ -1018,10 +997,6 @@ def _get_event_records( def supports_event_consumer_queries(self) -> bool: return True - @property - def supports_intersect(self) -> bool: - return True - def _get_event_records_result( self, event_records_filter: EventRecordsFilter, @@ -1692,39 +1667,6 @@ def get_event_tags_for_asset( AssetEventTagsTable.c.event_timestamp > datetime.utcfromtimestamp(asset_details.last_wipe_timestamp) ) - elif self.supports_intersect: - - def get_tag_filter_query(tag_key, tag_value): - filter_query = db_select([AssetEventTagsTable.c.event_id]).where( - db.and_( - AssetEventTagsTable.c.asset_key == asset_key.to_string(), - AssetEventTagsTable.c.key == tag_key, - AssetEventTagsTable.c.value == tag_value, - ) - ) - if asset_details and asset_details.last_wipe_timestamp: - filter_query = filter_query.where( - AssetEventTagsTable.c.event_timestamp - > datetime.utcfromtimestamp(asset_details.last_wipe_timestamp) - ) - return filter_query - - intersections = [ - get_tag_filter_query(tag_key, tag_value) - for tag_key, tag_value in filter_tags.items() - ] - - tags_query = db_select( - [ - AssetEventTagsTable.c.key, - AssetEventTagsTable.c.value, - AssetEventTagsTable.c.event_id, - ] - ).where( - db.and_( - AssetEventTagsTable.c.event_id.in_(db.intersect(*intersections)), - ) - ) else: table = self._apply_tags_table_joins(AssetEventTagsTable, filter_tags, asset_key) tags_query = db_select( diff --git a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py index 1f0953f2eb4ae..15cfd2871f581 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py +++ b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py @@ -237,10 +237,6 @@ def _add_cursor_limit_to_query( return query - @property - def supports_intersect(self) -> bool: - return True - def _add_filters_to_query(self, query: SqlAlchemyQuery, filters: RunsFilter) -> SqlAlchemyQuery: check.inst_param(filters, "filters", RunsFilter) diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql/event_log/event_log.py b/python_modules/libraries/dagster-mysql/dagster_mysql/event_log/event_log.py index 94aa98e7b4b18..6c9a16800e8ae 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql/event_log/event_log.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql/event_log/event_log.py @@ -32,13 +32,10 @@ mysql_alembic_config, mysql_isolation_level, mysql_url_from_config, - parse_mysql_version, retry_mysql_connection_fn, retry_mysql_creation_fn, ) -MINIMUM_MYSQL_INTERSECT_VERSION = "8.0.31" - class MySQLEventLogStorage(SqlEventLogStorage, ConfigurableClass): """MySQL-backed event log storage. @@ -208,12 +205,6 @@ def watch(self, run_id: str, cursor: Optional[str], callback: EventHandlerFn) -> def end_watch(self, run_id: str, handler: EventHandlerFn) -> None: self._event_watcher.unwatch_run(run_id, handler) - @property - def supports_intersect(self) -> bool: - return parse_mysql_version(self._mysql_version) >= parse_mysql_version( # type: ignore # (possible none) - MINIMUM_MYSQL_INTERSECT_VERSION - ) - @property def event_watcher(self) -> SqlPollingEventWatcher: return self._event_watcher diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql/run_storage/run_storage.py b/python_modules/libraries/dagster-mysql/dagster_mysql/run_storage/run_storage.py index ba2ca1bad8385..2bc57ba175207 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql/run_storage/run_storage.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql/run_storage/run_storage.py @@ -30,13 +30,11 @@ mysql_alembic_config, mysql_isolation_level, mysql_url_from_config, - parse_mysql_version, retry_mysql_connection_fn, retry_mysql_creation_fn, ) MINIMUM_MYSQL_BUCKET_VERSION = "8.0.0" -MINIMUM_MYSQL_INTERSECT_VERSION = "8.0.31" class MySQLRunStorage(SqlRunStorage, ConfigurableClass): @@ -158,12 +156,6 @@ def mark_index_built(self, migration_name: str) -> None: if migration_name in self._index_migration_cache: del self._index_migration_cache[migration_name] - @property - def supports_intersect(self) -> bool: - return parse_mysql_version(self._mysql_version) >= parse_mysql_version( # type: ignore - MINIMUM_MYSQL_INTERSECT_VERSION - ) - def add_daemon_heartbeat(self, daemon_heartbeat: DaemonHeartbeat) -> None: with self.connect() as conn: conn.execute( diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_event_log.py b/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_event_log.py index f5528ecdd1ce5..8a74459827044 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_event_log.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_event_log.py @@ -116,6 +116,3 @@ def test_load_from_config(self, conn_string): from_explicit = explicit_instance._event_storage # noqa: SLF001 assert from_url.mysql_url == from_explicit.mysql_url - - def test_materialization_tag_on_wipe(self, storage, instance): - pytest.skip("Running in to error we haven't tracked down yet. Skipping for now.")