Skip to content

Commit

Permalink
remove intersect from event storage (#17492)
Browse files Browse the repository at this point in the history
Mysql 8.0.35 has a change that breaks our usage of `intersect`. We
primarily added this for run tags, but no longer use it there. Let's
remove it from event logs too.

I don't have the full context here. @prha et al. could you give this
some extra scrutiny
  • Loading branch information
johannkm authored Oct 31, 2023
1 parent d292ae8 commit bf62107
Show file tree
Hide file tree
Showing 5 changed files with 1 addition and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -878,23 +878,6 @@ def _apply_filter_to_query(
isinstance(event_records_filter.asset_key, AssetKey),
"Asset key must be set in event records filter to filter by tags.",
)
if self.supports_intersect:
intersections = [
db_select([AssetEventTagsTable.c.event_id]).where(
db.and_(
AssetEventTagsTable.c.asset_key
== event_records_filter.asset_key.to_string(), # type: ignore # (bad sig?)
AssetEventTagsTable.c.key == key,
(
AssetEventTagsTable.c.value == value
if isinstance(value, str)
else AssetEventTagsTable.c.value.in_(value)
),
)
)
for key, value in event_records_filter.tags.items()
]
query = query.where(SqlEventLogStorageTable.c.id.in_(db.intersect(*intersections)))

return query

Expand Down Expand Up @@ -952,11 +935,7 @@ def _get_event_records(
else:
asset_details = None

if (
event_records_filter.tags
and not self.supports_intersect
and self.has_table(AssetEventTagsTable.name)
):
if event_records_filter.tags and self.has_table(AssetEventTagsTable.name):
table = self._apply_tags_table_joins(
SqlEventLogStorageTable, event_records_filter.tags, event_records_filter.asset_key
)
Expand Down Expand Up @@ -1018,10 +997,6 @@ def _get_event_records(
def supports_event_consumer_queries(self) -> bool:
return True

@property
def supports_intersect(self) -> bool:
return True

def _get_event_records_result(
self,
event_records_filter: EventRecordsFilter,
Expand Down Expand Up @@ -1692,39 +1667,6 @@ def get_event_tags_for_asset(
AssetEventTagsTable.c.event_timestamp
> datetime.utcfromtimestamp(asset_details.last_wipe_timestamp)
)
elif self.supports_intersect:

def get_tag_filter_query(tag_key, tag_value):
filter_query = db_select([AssetEventTagsTable.c.event_id]).where(
db.and_(
AssetEventTagsTable.c.asset_key == asset_key.to_string(),
AssetEventTagsTable.c.key == tag_key,
AssetEventTagsTable.c.value == tag_value,
)
)
if asset_details and asset_details.last_wipe_timestamp:
filter_query = filter_query.where(
AssetEventTagsTable.c.event_timestamp
> datetime.utcfromtimestamp(asset_details.last_wipe_timestamp)
)
return filter_query

intersections = [
get_tag_filter_query(tag_key, tag_value)
for tag_key, tag_value in filter_tags.items()
]

tags_query = db_select(
[
AssetEventTagsTable.c.key,
AssetEventTagsTable.c.value,
AssetEventTagsTable.c.event_id,
]
).where(
db.and_(
AssetEventTagsTable.c.event_id.in_(db.intersect(*intersections)),
)
)
else:
table = self._apply_tags_table_joins(AssetEventTagsTable, filter_tags, asset_key)
tags_query = db_select(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,6 @@ def _add_cursor_limit_to_query(

return query

@property
def supports_intersect(self) -> bool:
return True

def _add_filters_to_query(self, query: SqlAlchemyQuery, filters: RunsFilter) -> SqlAlchemyQuery:
check.inst_param(filters, "filters", RunsFilter)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,10 @@
mysql_alembic_config,
mysql_isolation_level,
mysql_url_from_config,
parse_mysql_version,
retry_mysql_connection_fn,
retry_mysql_creation_fn,
)

MINIMUM_MYSQL_INTERSECT_VERSION = "8.0.31"


class MySQLEventLogStorage(SqlEventLogStorage, ConfigurableClass):
"""MySQL-backed event log storage.
Expand Down Expand Up @@ -208,12 +205,6 @@ def watch(self, run_id: str, cursor: Optional[str], callback: EventHandlerFn) ->
def end_watch(self, run_id: str, handler: EventHandlerFn) -> None:
self._event_watcher.unwatch_run(run_id, handler)

@property
def supports_intersect(self) -> bool:
return parse_mysql_version(self._mysql_version) >= parse_mysql_version( # type: ignore # (possible none)
MINIMUM_MYSQL_INTERSECT_VERSION
)

@property
def event_watcher(self) -> SqlPollingEventWatcher:
return self._event_watcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@
mysql_alembic_config,
mysql_isolation_level,
mysql_url_from_config,
parse_mysql_version,
retry_mysql_connection_fn,
retry_mysql_creation_fn,
)

MINIMUM_MYSQL_BUCKET_VERSION = "8.0.0"
MINIMUM_MYSQL_INTERSECT_VERSION = "8.0.31"


class MySQLRunStorage(SqlRunStorage, ConfigurableClass):
Expand Down Expand Up @@ -158,12 +156,6 @@ def mark_index_built(self, migration_name: str) -> None:
if migration_name in self._index_migration_cache:
del self._index_migration_cache[migration_name]

@property
def supports_intersect(self) -> bool:
return parse_mysql_version(self._mysql_version) >= parse_mysql_version( # type: ignore
MINIMUM_MYSQL_INTERSECT_VERSION
)

def add_daemon_heartbeat(self, daemon_heartbeat: DaemonHeartbeat) -> None:
with self.connect() as conn:
conn.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,3 @@ def test_load_from_config(self, conn_string):
from_explicit = explicit_instance._event_storage # noqa: SLF001

assert from_url.mysql_url == from_explicit.mysql_url

def test_materialization_tag_on_wipe(self, storage, instance):
pytest.skip("Running in to error we haven't tracked down yet. Skipping for now.")

0 comments on commit bf62107

Please sign in to comment.