Skip to content

Commit

Permalink
refactor(backends): clean up resources produced by memtable (#10055)
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud authored Sep 10, 2024
1 parent 62c63d2 commit 019cae5
Show file tree
Hide file tree
Showing 17 changed files with 138 additions and 92 deletions.
15 changes: 15 additions & 0 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import abc
import collections.abc
import contextlib
import functools
import importlib.metadata
import keyword
Expand Down Expand Up @@ -1116,13 +1117,27 @@ def _register_in_memory_tables(self, expr: ir.Expr) -> None:
for memtable in expr.op().find(ops.InMemoryTable):
if not self._in_memory_table_exists(memtable.name):
self._register_in_memory_table(memtable)
weakref.finalize(
memtable, self._finalize_in_memory_table, memtable.name
)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
if self.supports_in_memory_tables:
raise NotImplementedError(
f"{self.name} must implement `_register_in_memory_table` to support in-memory tables"
)

def _finalize_in_memory_table(self, name: str) -> None:
"""Wrap `_finalize_memtable` to suppress exceptions."""
with contextlib.suppress(Exception):
self._finalize_memtable(name)

def _finalize_memtable(self, name: str) -> None:
if self.supports_in_memory_tables:
raise NotImplementedError(
f"{self.name} must implement `_finalize_memtable` to support in-memory tables"
)

def _run_pre_execute_hooks(self, expr: ir.Expr) -> None:
"""Backend-specific hooks to run before an expression is executed."""
self._register_udfs(expr)
Expand Down
11 changes: 11 additions & 0 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,17 @@ def _in_memory_table_exists(self, name: str) -> bool:
else:
return True

def _finalize_memtable(self, name: str) -> None:
session_dataset = self._session_dataset
table_id = sg.table(
name,
db=session_dataset.dataset_id,
catalog=session_dataset.project,
quoted=False,
)
drop_sql_stmt = sge.Drop(kind="TABLE", this=table_id, exists=True)
self.raw_sql(drop_sql_stmt)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
session_dataset = self._session_dataset

Expand Down
13 changes: 5 additions & 8 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
import urllib
import warnings
import weakref
from operator import itemgetter
from pathlib import Path
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -160,12 +159,9 @@ def create_table(
properties.append(sge.TemporaryProperty())
catalog = "temp"

temp_memtable_view = None

if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand Down Expand Up @@ -234,9 +230,6 @@ def create_table(
).sql(self.name)
)

if temp_memtable_view is not None:
self.con.unregister(temp_memtable_view)

return self.table(name, database=(catalog, database))

def table(
Expand Down Expand Up @@ -1620,11 +1613,15 @@ def _in_memory_table_exists(self, name: str) -> bool:
def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
self.con.register(op.name, op.data.to_pyarrow(op.schema))

def _finalize_memtable(self, name: str) -> None:
# if we don't aggressively unregister tables duckdb will keep a
# reference to every memtable ever registered, even if there's no
# way for a user to access the operation anymore, resulting in a
# memory leak
weakref.finalize(op, self.con.unregister, op.name)
#
# we can't use drop_table, because self.con.register creates a view, so
# use the corresponding unregister method
self.con.unregister(name)

def _register_udfs(self, expr: ir.Expr) -> None:
con = self.con
Expand Down
29 changes: 12 additions & 17 deletions ibis/backends/exasol/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import atexit
import contextlib
import datetime
import re
Expand Down Expand Up @@ -42,7 +41,6 @@ class Backend(SQLBackend, CanCreateDatabase, CanCreateSchema):
compiler = sc.exasol.compiler
supports_temporary_tables = False
supports_create_or_replace = False
supports_in_memory_tables = False
supports_python_udfs = False

@property
Expand Down Expand Up @@ -278,14 +276,15 @@ def process_item(item: Any):
with self._safe_raw_sql(create_stmt_sql):
if not df.empty:
self.con.ext.insert_multi(name, rows)
atexit.register(self._clean_up_tmp_table, ident)

def _clean_up_tmp_table(self, ident: sge.Identifier) -> None:
with self._safe_raw_sql(
sge.Drop(kind="TABLE", this=ident, exists=True, cascade=True)
):
def _clean_up_tmp_table(self, name: str) -> None:
ident = sg.to_identifier(name, quoted=self.compiler.quoted)
sql = sge.Drop(kind="TABLE", this=ident, exists=True, cascade=True)
with self._safe_raw_sql(sql):
pass

_finalize_memtable = _clean_up_tmp_table

def create_table(
self,
name: str,
Expand Down Expand Up @@ -334,11 +333,9 @@ def create_table(

quoted = self.compiler.quoted

temp_memtable_view = None
if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand All @@ -356,31 +353,29 @@ def create_table(
if not schema:
schema = table.schema()

table = sg.table(temp_name, catalog=database, quoted=quoted)
target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect))
table_expr = sg.table(temp_name, catalog=database, quoted=quoted)
target = sge.Schema(
this=table_expr, expressions=schema.to_sqlglot(self.dialect)
)

create_stmt = sge.Create(kind="TABLE", this=target)

this = sg.table(name, catalog=database, quoted=quoted)
with self._safe_raw_sql(create_stmt):
if query is not None:
self.con.execute(
sge.Insert(this=table, expression=query).sql(self.name)
sge.Insert(this=table_expr, expression=query).sql(self.name)
)

if overwrite:
self.con.execute(
sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name)
)
self.con.execute(
f"RENAME TABLE {table.sql(self.name)} TO {this.sql(self.name)}"
f"RENAME TABLE {table_expr.sql(self.name)} TO {this.sql(self.name)}"
)

if schema is None:
# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)
return self.table(name, database=database)

# preserve the input schema if it was provided
Expand Down
19 changes: 8 additions & 11 deletions ibis/backends/mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,11 +625,9 @@ def create_table(
properties.append(sge.TemporaryProperty())
catalog, db = None, None

temp_memtable_view = None
if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand All @@ -647,19 +645,22 @@ def create_table(
if not schema:
schema = table.schema()

table = sg.table(
"#" * temp + temp_name, catalog=catalog, db=db, quoted=self.compiler.quoted
)
quoted = self.compiler.quoted
raw_table = sg.table(temp_name, catalog=catalog, db=db, quoted=False)
target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect))
target = sge.Schema(
this=sg.table(
"#" * temp + temp_name, catalog=catalog, db=db, quoted=quoted
),
expressions=schema.to_sqlglot(self.dialect),
)

create_stmt = sge.Create(
kind="TABLE",
this=target,
properties=sge.Properties(expressions=properties),
)

this = sg.table(name, catalog=catalog, db=db, quoted=self.compiler.quoted)
this = sg.table(name, catalog=catalog, db=db, quoted=quoted)
raw_this = sg.table(name, catalog=catalog, db=db, quoted=False)
with self._safe_ddl(create_stmt) as cur:
if query is not None:
Expand Down Expand Up @@ -692,10 +693,6 @@ def create_table(
db = "dbo"

if schema is None:
# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)
return self.table(name, database=(catalog, db))

# preserve the input schema if it was provided
Expand Down
19 changes: 15 additions & 4 deletions ibis/backends/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,10 @@ def create_table(
if not schema:
schema = table.schema()

table = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted)
target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect))
table_expr = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted)
target = sge.Schema(
this=table_expr, expressions=schema.to_sqlglot(self.dialect)
)

create_stmt = sge.Create(
kind="TABLE",
Expand All @@ -437,15 +439,17 @@ def create_table(
this = sg.table(name, catalog=database, quoted=self.compiler.quoted)
with self._safe_raw_sql(create_stmt) as cur:
if query is not None:
insert_stmt = sge.Insert(this=table, expression=query).sql(self.name)
insert_stmt = sge.Insert(this=table_expr, expression=query).sql(
self.name
)
cur.execute(insert_stmt)

if overwrite:
cur.execute(
sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name)
)
cur.execute(
f"ALTER TABLE IF EXISTS {table.sql(self.name)} RENAME TO {this.sql(self.name)}"
f"ALTER TABLE IF EXISTS {table_expr.sql(self.name)} RENAME TO {this.sql(self.name)}"
)

if schema is None:
Expand Down Expand Up @@ -538,3 +542,10 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
raise
df = MySQLPandasData.convert_table(df, schema)
return df

def _finalize_memtable(self, name: str) -> None:
"""No-op.
Executing **any** SQL in a finalizer causes the underlying connection
socket to be set to `None`. It is unclear why this happens.
"""
23 changes: 10 additions & 13 deletions ibis/backends/oracle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

import atexit
import contextlib
import re
import warnings
Expand Down Expand Up @@ -419,11 +418,9 @@ def create_table(
if temp:
properties.append(sge.TemporaryProperty())

temp_memtable_view = None
if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand Down Expand Up @@ -468,10 +465,6 @@ def create_table(
)

if schema is None:
# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)
return self.table(name, database=database)

# preserve the input schema if it was provided
Expand Down Expand Up @@ -527,8 +520,6 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
insert_stmt, list(data.iloc[start:end].itertuples(index=False))
)

atexit.register(self._clean_up_tmp_table, name)

def _get_schema_using_query(self, query: str) -> sch.Schema:
name = util.gen_name("oracle_metadata")
dialect = self.name
Expand Down Expand Up @@ -608,6 +599,13 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
return OraclePandasData.convert_table(df, schema)

def _clean_up_tmp_table(self, name: str) -> None:
dialect = self.dialect

ident = sg.to_identifier(name, quoted=self.compiler.quoted)

truncate = sge.TruncateTable(expressions=[ident]).sql(dialect)
drop = sge.Drop(kind="TABLE", this=ident).sql(dialect)

with self.begin() as bind:
# global temporary tables cannot be dropped without first truncating them
#
Expand All @@ -616,9 +614,8 @@ def _clean_up_tmp_table(self, name: str) -> None:
# ignore DatabaseError exceptions because the table may not exist
# because it's already been deleted
with contextlib.suppress(oracledb.DatabaseError):
bind.execute(f'TRUNCATE TABLE "{name}"')
bind.execute(truncate)
with contextlib.suppress(oracledb.DatabaseError):
bind.execute(f'DROP TABLE "{name}"')
bind.execute(drop)

def _drop_cached_table(self, name):
self._clean_up_tmp_table(name)
_finalize_memtable = _drop_cached_table = _clean_up_tmp_table
3 changes: 3 additions & 0 deletions ibis/backends/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ def execute(self, query, params=None, limit="default", **kwargs):
def _create_cached_table(self, name, expr):
return self.create_table(name, expr.execute())

def _finalize_memtable(self, name: str) -> None:
"""No-op, let Python handle clean up."""


@lazy_singledispatch
def _convert_object(obj: Any, _conn):
Expand Down
3 changes: 3 additions & 0 deletions ibis/backends/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ def _in_memory_table_exists(self, name: str) -> bool:
def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
self._add_table(op.name, op.data.to_polars(op.schema).lazy())

def _finalize_memtable(self, name: str) -> None:
self.drop_table(name, force=True)

@deprecated(
as_of="9.1",
instead="use the explicit `read_*` method for the filetype you are trying to read, e.g., read_parquet, read_csv, etc.",
Expand Down
Loading

0 comments on commit 019cae5

Please sign in to comment.