Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(backends): speed up most memtable existence checks #10067

Merged
merged 11 commits into from
Sep 11, 2024
20 changes: 20 additions & 0 deletions ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,3 +773,23 @@
with self._safe_raw_sql(src, external_tables=external_tables):
pass
return self.table(name, database=database)

def _in_memory_table_exists(self, name: str) -> bool:
name = sg.table(name, quoted=self.compiler.quoted).sql(self.dialect)
try:
# DESCRIBE TABLE $TABLE FORMAT NULL is the fastest way to check
# table existence in clickhouse; FORMAT NULL produces no data which
# is ideal since we don't care about the output for existence
# checking
#
# Other methods compared were
# 1. SELECT 1 FROM $TABLE LIMIT 0
# 2. SHOW TABLES LIKE $TABLE LIMIT 1
#
# if the table exists nothing is returned and there's no error
# otherwise there's an error
self.con.raw_query(f"DESCRIBE {name} FORMAT NULL")
except cc.driver.exceptions.DatabaseError:
return False
else:
return True

Check warning on line 795 in ibis/backends/clickhouse/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/clickhouse/__init__.py#L795

Added line #L795 was not covered by tests
3 changes: 3 additions & 0 deletions ibis/backends/exasol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:
finally:
self.con.execute(drop_view)

def _in_memory_table_exists(self, name: str) -> bool:
return self.con.meta.table_exists(name)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
Expand Down
10 changes: 10 additions & 0 deletions ibis/backends/mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,16 @@ def create_table(
namespace=ops.Namespace(catalog=catalog, database=db),
).to_expr()

def _in_memory_table_exists(self, name: str) -> bool:
# The single character U here means user-defined table
# see https://learn.microsoft.com/en-us/sql/relational-databases/system-catalog-views/sys-objects-transact-sql?view=sql-server-ver16
sql = sg.select(sg.func("object_id", sge.convert(name), sge.convert("U"))).sql(
self.dialect
)
with self.begin() as cur:
[(result,)] = cur.execute(sql).fetchall()
return result is not None

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
Expand Down
18 changes: 18 additions & 0 deletions ibis/backends/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pymysql
import sqlglot as sg
import sqlglot.expressions as sge
from pymysql.constants import ER

import ibis
import ibis.backends.sql.compilers as sc
Expand Down Expand Up @@ -465,6 +466,23 @@
name, schema=schema, source=self, namespace=ops.Namespace(database=database)
).to_expr()

def _in_memory_table_exists(self, name: str) -> bool:
name = sg.to_identifier(name, quoted=self.compiler.quoted).sql(self.dialect)
# just return the single field with column names; no need to bring back
# everything if the command succeeds
sql = f"SHOW COLUMNS FROM {name} LIKE 'Field'"
try:
with self.begin() as cur:
cur.execute(sql)
cur.fetchall()
except pymysql.err.ProgrammingError as e:
err_code, _ = e.args
if err_code == ER.NO_SUCH_TABLE:
return False
raise

Check warning on line 482 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L482

Added line #L482 was not covered by tests
else:
return True

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
Expand Down
17 changes: 16 additions & 1 deletion ibis/backends/oracle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ibis import util
from ibis.backends import CanListDatabase, CanListSchema
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compilers.base import STAR, C
from ibis.backends.sql.compilers.base import NULL, STAR, C

if TYPE_CHECKING:
from urllib.parse import ParseResult
Expand Down Expand Up @@ -495,6 +495,21 @@ def drop_table(

super().drop_table(name, database=(catalog, db), force=force)

def _in_memory_table_exists(self, name: str) -> bool:
sql = (
sg.select(NULL)
.from_(sg.to_identifier("USER_OBJECTS", quoted=self.compiler.quoted))
.where(
C.OBJECT_TYPE.eq(sge.convert("TABLE")),
C.OBJECT_NAME.eq(sge.convert(name)),
)
.limit(sge.convert(1))
.sql(self.dialect)
)
with self.begin() as cur:
results = cur.execute(sql).fetchall()
return bool(results)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema

Expand Down
15 changes: 15 additions & 0 deletions ibis/backends/postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ def _from_url(self, url: ParseResult, **kwargs):

return self.connect(**kwargs)

def _in_memory_table_exists(self, name: str) -> bool:
import psycopg2.errors

ident = sg.to_identifier(name, quoted=self.compiler.quoted)
sql = sg.select(sge.convert(1)).from_(ident).limit(0).sql(self.dialect)

try:
with self.begin() as cur:
cur.execute(sql)
cur.fetchall()
except psycopg2.errors.UndefinedTable:
return False
else:
return True

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
from psycopg2.extras import execute_batch

Expand Down
7 changes: 7 additions & 0 deletions ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,18 @@ def _register_udfs(self, expr: ir.Expr) -> None:
self._session.udf.register(f"unwrap_json_{typ.__name__}", unwrap_json(typ))
self._session.udf.register("unwrap_json_float", unwrap_json_float)

def _in_memory_table_exists(self, name: str) -> bool:
sql = f"SHOW TABLES IN {self.current_database} LIKE '{name}'"
Copy link
Member Author

@cpcloud cpcloud Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow self._session.catalog.tableExists doesn't give the same answer as the code I have here, even when passing a fully quoted default.$memtable (the second dbName parameter is deprecated and the deprecation warning suggests passing the fully qualified name instead.)

return bool(self._session.sql(sql).count())

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = PySparkSchema.from_ibis(op.schema)
df = self._session.createDataFrame(data=op.data.to_frame(), schema=schema)
df.createTempView(op.name)

def _finalize_memtable(self, name: str) -> None:
self._session.catalog.dropTempView(name)

@contextlib.contextmanager
def _safe_raw_sql(self, query: str) -> Any:
yield self.raw_sql(query)
Expand Down
15 changes: 15 additions & 0 deletions ibis/backends/risingwave/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,21 @@ def create_table(
name, schema=schema, source=self, namespace=ops.Namespace(database=database)
).to_expr()

def _in_memory_table_exists(self, name: str) -> bool:
import psycopg2.errors

ident = sg.to_identifier(name, quoted=self.compiler.quoted)
sql = sg.select(sge.convert(1)).from_(ident).limit(0).sql(self.dialect)

try:
with self.begin() as cur:
cur.execute(sql)
cur.fetchall()
except psycopg2.errors.InternalError:
return False
else:
return True

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
Expand Down
20 changes: 17 additions & 3 deletions ibis/backends/snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,9 +645,23 @@
return self._filter_with_like(tables + views, like=like)

def _in_memory_table_exists(self, name: str) -> bool:
with self.con.cursor() as con:
result = con.execute(f"SHOW TABLES LIKE '{name}'").fetchone()
return bool(result)
import snowflake.connector

Check warning on line 648 in ibis/backends/snowflake/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/snowflake/__init__.py#L648

Added line #L648 was not covered by tests

ident = sg.to_identifier(name, quoted=self.compiler.quoted)
sql = sg.select(sge.convert(1)).from_(ident).limit(0).sql(self.dialect)

Check warning on line 651 in ibis/backends/snowflake/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/snowflake/__init__.py#L650-L651

Added lines #L650 - L651 were not covered by tests

try:

Check warning on line 653 in ibis/backends/snowflake/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/snowflake/__init__.py#L653

Added line #L653 was not covered by tests
with self.con.cursor() as cur:
cur.execute(sql).fetchall()
except snowflake.connector.errors.ProgrammingError as e:

Check warning on line 656 in ibis/backends/snowflake/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/snowflake/__init__.py#L655-L656

Added lines #L655 - L656 were not covered by tests
# this cryptic error message is the only generic and reliable way
# to tell if the error means "table not found for any reason"
# otherwise, we need to reraise the exception
if e.sqlstate == "42S02":
return False
raise

Check warning on line 662 in ibis/backends/snowflake/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/snowflake/__init__.py#L661-L662

Added lines #L661 - L662 were not covered by tests
else:
return True

Check warning on line 664 in ibis/backends/snowflake/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/snowflake/__init__.py#L664

Added line #L664 was not covered by tests

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
import pyarrow.parquet as pq
Expand Down
12 changes: 12 additions & 0 deletions ibis/backends/sqlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,18 @@ def _generate_create_table(self, table: sge.Table, schema: sch.Schema):

return sge.Create(kind="TABLE", this=target)

def _in_memory_table_exists(self, name: str) -> bool:
ident = sg.to_identifier(name, quoted=self.compiler.quoted)
query = sg.select(sge.convert(1)).from_(ident).limit(0).sql(self.dialect)
try:
with self.begin() as cur:
cur.execute(query)
cur.fetchall()
except sqlite3.OperationalError:
return False
else:
return True

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
table = sg.table(op.name, quoted=self.compiler.quoted, catalog="temp")
create_stmt = self._generate_create_table(table, op.schema).sql(self.name)
Expand Down
15 changes: 15 additions & 0 deletions ibis/backends/trino/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,21 @@
df = TrinoPandasData.convert_table(df, schema)
return df

def _in_memory_table_exists(self, name: str) -> bool:
ident = sg.to_identifier(name, quoted=self.compiler.quoted)
sql = sg.select(sge.convert(1)).from_(ident).limit(0).sql(self.dialect)

try:
with self.begin() as cur:
cur.execute(sql)
cur.fetchall()
except trino.exceptions.TrinoUserError as e:
if e.error_name == "TABLE_NOT_FOUND":
return False
raise

Check warning on line 566 in ibis/backends/trino/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/trino/__init__.py#L566

Added line #L566 was not covered by tests
else:
return True

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
Expand Down
Loading