Skip to content

Commit

Permalink
perf(backends): speed up most memtable existence checks (ibis-project…
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud authored and ncclementi committed Sep 24, 2024
1 parent 2b22433 commit 20e0bb7
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 4 deletions.
20 changes: 20 additions & 0 deletions ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,3 +773,23 @@ def create_view(
with self._safe_raw_sql(src, external_tables=external_tables):
pass
return self.table(name, database=database)

def _in_memory_table_exists(self, name: str) -> bool:
name = sg.table(name, quoted=self.compiler.quoted).sql(self.dialect)
try:
# DESCRIBE TABLE $TABLE FORMAT NULL is the fastest way to check
# table existence in clickhouse; FORMAT NULL produces no data which
# is ideal since we don't care about the output for existence
# checking
#
# Other methods compared were
# 1. SELECT 1 FROM $TABLE LIMIT 0
# 2. SHOW TABLES LIKE $TABLE LIMIT 1
#
# if the table exists nothing is returned and there's no error
# otherwise there's an error
self.con.raw_query(f"DESCRIBE {name} FORMAT NULL")
except cc.driver.exceptions.DatabaseError:
return False
else:
return True
3 changes: 3 additions & 0 deletions ibis/backends/exasol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:
finally:
self.con.execute(drop_view)

def _in_memory_table_exists(self, name: str) -> bool:
return self.con.meta.table_exists(name)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
Expand Down
10 changes: 10 additions & 0 deletions ibis/backends/mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,16 @@ def create_table(
namespace=ops.Namespace(catalog=catalog, database=db),
).to_expr()

def _in_memory_table_exists(self, name: str) -> bool:
# The single character U here means user-defined table
# see https://learn.microsoft.com/en-us/sql/relational-databases/system-catalog-views/sys-objects-transact-sql?view=sql-server-ver16
sql = sg.select(sg.func("object_id", sge.convert(name), sge.convert("U"))).sql(
self.dialect
)
with self.begin() as cur:
[(result,)] = cur.execute(sql).fetchall()
return result is not None

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
Expand Down
18 changes: 18 additions & 0 deletions ibis/backends/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pymysql
import sqlglot as sg
import sqlglot.expressions as sge
from pymysql.constants import ER

import ibis
import ibis.backends.sql.compilers as sc
Expand Down Expand Up @@ -465,6 +466,23 @@ def create_table(
name, schema=schema, source=self, namespace=ops.Namespace(database=database)
).to_expr()

def _in_memory_table_exists(self, name: str) -> bool:
name = sg.to_identifier(name, quoted=self.compiler.quoted).sql(self.dialect)
# just return the single field with column names; no need to bring back
# everything if the command succeeds
sql = f"SHOW COLUMNS FROM {name} LIKE 'Field'"
try:
with self.begin() as cur:
cur.execute(sql)
cur.fetchall()
except pymysql.err.ProgrammingError as e:
err_code, _ = e.args
if err_code == ER.NO_SUCH_TABLE:
return False
raise
else:
return True

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
Expand Down
17 changes: 16 additions & 1 deletion ibis/backends/oracle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ibis import util
from ibis.backends import CanListDatabase, CanListSchema
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compilers.base import STAR, C
from ibis.backends.sql.compilers.base import NULL, STAR, C

if TYPE_CHECKING:
from urllib.parse import ParseResult
Expand Down Expand Up @@ -495,6 +495,21 @@ def drop_table(

super().drop_table(name, database=(catalog, db), force=force)

def _in_memory_table_exists(self, name: str) -> bool:
sql = (
sg.select(NULL)
.from_(sg.to_identifier("USER_OBJECTS", quoted=self.compiler.quoted))
.where(
C.OBJECT_TYPE.eq(sge.convert("TABLE")),
C.OBJECT_NAME.eq(sge.convert(name)),
)
.limit(sge.convert(1))
.sql(self.dialect)
)
with self.begin() as cur:
results = cur.execute(sql).fetchall()
return bool(results)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema

Expand Down
15 changes: 15 additions & 0 deletions ibis/backends/postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ def _from_url(self, url: ParseResult, **kwargs):

return self.connect(**kwargs)

def _in_memory_table_exists(self, name: str) -> bool:
import psycopg2.errors

ident = sg.to_identifier(name, quoted=self.compiler.quoted)
sql = sg.select(sge.convert(1)).from_(ident).limit(0).sql(self.dialect)

try:
with self.begin() as cur:
cur.execute(sql)
cur.fetchall()
except psycopg2.errors.UndefinedTable:
return False
else:
return True

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
from psycopg2.extras import execute_batch

Expand Down
7 changes: 7 additions & 0 deletions ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,18 @@ def _register_udfs(self, expr: ir.Expr) -> None:
self._session.udf.register(f"unwrap_json_{typ.__name__}", unwrap_json(typ))
self._session.udf.register("unwrap_json_float", unwrap_json_float)

def _in_memory_table_exists(self, name: str) -> bool:
sql = f"SHOW TABLES IN {self.current_database} LIKE '{name}'"
return bool(self._session.sql(sql).count())

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = PySparkSchema.from_ibis(op.schema)
df = self._session.createDataFrame(data=op.data.to_frame(), schema=schema)
df.createTempView(op.name)

def _finalize_memtable(self, name: str) -> None:
self._session.catalog.dropTempView(name)

@contextlib.contextmanager
def _safe_raw_sql(self, query: str) -> Any:
yield self.raw_sql(query)
Expand Down
15 changes: 15 additions & 0 deletions ibis/backends/risingwave/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,21 @@ def create_table(
name, schema=schema, source=self, namespace=ops.Namespace(database=database)
).to_expr()

def _in_memory_table_exists(self, name: str) -> bool:
import psycopg2.errors

ident = sg.to_identifier(name, quoted=self.compiler.quoted)
sql = sg.select(sge.convert(1)).from_(ident).limit(0).sql(self.dialect)

try:
with self.begin() as cur:
cur.execute(sql)
cur.fetchall()
except psycopg2.errors.InternalError:
return False
else:
return True

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
Expand Down
20 changes: 17 additions & 3 deletions ibis/backends/snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,9 +645,23 @@ def list_tables(
return self._filter_with_like(tables + views, like=like)

def _in_memory_table_exists(self, name: str) -> bool:
with self.con.cursor() as con:
result = con.execute(f"SHOW TABLES LIKE '{name}'").fetchone()
return bool(result)
import snowflake.connector

ident = sg.to_identifier(name, quoted=self.compiler.quoted)
sql = sg.select(sge.convert(1)).from_(ident).limit(0).sql(self.dialect)

try:
with self.con.cursor() as cur:
cur.execute(sql).fetchall()
except snowflake.connector.errors.ProgrammingError as e:
# this cryptic error message is the only generic and reliable way
# to tell if the error means "table not found for any reason"
# otherwise, we need to reraise the exception
if e.sqlstate == "42S02":
return False
raise
else:
return True

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
import pyarrow.parquet as pq
Expand Down
12 changes: 12 additions & 0 deletions ibis/backends/sqlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,18 @@ def _generate_create_table(self, table: sge.Table, schema: sch.Schema):

return sge.Create(kind="TABLE", this=target)

def _in_memory_table_exists(self, name: str) -> bool:
ident = sg.to_identifier(name, quoted=self.compiler.quoted)
query = sg.select(sge.convert(1)).from_(ident).limit(0).sql(self.dialect)
try:
with self.begin() as cur:
cur.execute(query)
cur.fetchall()
except sqlite3.OperationalError:
return False
else:
return True

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
table = sg.table(op.name, quoted=self.compiler.quoted, catalog="temp")
create_stmt = self._generate_create_table(table, op.schema).sql(self.name)
Expand Down
15 changes: 15 additions & 0 deletions ibis/backends/trino/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,21 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
df = TrinoPandasData.convert_table(df, schema)
return df

def _in_memory_table_exists(self, name: str) -> bool:
ident = sg.to_identifier(name, quoted=self.compiler.quoted)
sql = sg.select(sge.convert(1)).from_(ident).limit(0).sql(self.dialect)

try:
with self.begin() as cur:
cur.execute(sql)
cur.fetchall()
except trino.exceptions.TrinoUserError as e:
if e.error_name == "TABLE_NOT_FOUND":
return False
raise
else:
return True

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
Expand Down

0 comments on commit 20e0bb7

Please sign in to comment.