Skip to content

Commit

Permalink
fix(insert): user can specify insert table in specified database (#10285
Browse files Browse the repository at this point in the history
)

Co-authored-by: Phillip Cloud <[email protected]>
  • Loading branch information
gforsyth and cpcloud authored Oct 16, 2024
1 parent de943ad commit 0e848d0
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 8 deletions.
13 changes: 8 additions & 5 deletions ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ def insert(
obj: pd.DataFrame | ir.Table,
settings: Mapping[str, Any] | None = None,
overwrite: bool = False,
database: str | None = None,
**kwargs: Any,
):
import pandas as pd
Expand All @@ -436,16 +437,18 @@ def insert(
self.truncate_table(name)

if isinstance(obj, pa.Table):
return self.con.insert_arrow(name, obj, settings=settings, **kwargs)
return self.con.insert_arrow(
name, obj, database=database, settings=settings, **kwargs
)
elif isinstance(obj, pd.DataFrame):
return self.con.insert_df(name, obj, settings=settings, **kwargs)
elif not isinstance(obj, ir.Table):
obj = ibis.memtable(obj)

query = self._build_insert_from_table(target=name, source=obj)
query = self._build_insert_from_table(target=name, source=obj, db=database)
external_tables = self._collect_in_memory_tables(obj, {})
external_data = self._normalize_external_tables(external_tables)
return self.con.command(query.sql(self.name), external_data=external_data)
return self.con.command(query.sql(self.dialect), external_data=external_data)

def raw_sql(
self,
Expand Down Expand Up @@ -508,8 +511,8 @@ def get_schema(
"""
if catalog is not None:
raise com.UnsupportedBackendFeatureError(
"`catalog` namespaces are not supported by clickhouse"
raise com.UnsupportedOperationError(
"`catalog` namespaces are not supported by ClickHouse"
)
query = sge.Describe(this=sg.table(table_name, db=database))
try:
Expand Down
8 changes: 8 additions & 0 deletions ibis/backends/clickhouse/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pytest import param

import ibis
import ibis.common.exceptions as exc
import ibis.expr.datatypes as dt
import ibis.expr.types as ir
from ibis import config, udf
Expand Down Expand Up @@ -458,3 +459,10 @@ def test_query_cache(con, method_name):
method(settings={"ooze_query_cash": True})

assert result == expected


def test_invalid_catalog_argument(con):
with pytest.raises(
exc.UnsupportedOperationError, match="`catalog` namespaces are not supported"
):
con.get_schema("t", catalog="a", database="b")
8 changes: 6 additions & 2 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -906,18 +906,22 @@ def insert(
)
return self.raw_sql(statement.compile())

identifier = sg.table(
table_name, db=database, catalog=catalog, quoted=self.compiler.quoted
).sql(self.dialect)

if isinstance(obj, pa.Table):
obj = obj.to_pandas()
if isinstance(obj, dict):
obj = pd.DataFrame.from_dict(obj)
if isinstance(obj, pd.DataFrame):
table = self._table_env.from_pandas(obj)
return table.execute_insert(table_name, overwrite=overwrite)
return table.execute_insert(identifier, overwrite=overwrite)

if isinstance(obj, list):
# pyflink infers datatypes, which may sometimes result in incompatible types
table = self._table_env.from_elements(obj)
return table.execute_insert(table_name, overwrite=overwrite)
return table.execute_insert(identifier, overwrite=overwrite)

raise ValueError(
"No operation is being performed. Either the obj parameter "
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def _build_insert_from_table(
# Compare the columns between the target table and the object to be inserted
# If source is a subset of target, use source columns for insert list
# Otherwise, assume auto-generated column names and use positional ordering.
target_cols = self.get_schema(target).keys()
target_cols = self.get_schema(target, catalog=catalog, database=db).keys()

columns = (
source_cols
Expand Down
34 changes: 34 additions & 0 deletions ibis/backends/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,40 @@ def test_overwrite(ddl_con, monkeypatch):
assert t2.count().execute() == expected_count


@contextlib.contextmanager
def create_and_destroy_db(con):
con.create_database(dbname := gen_name("db"))
try:
yield dbname
finally:
con.drop_database(dbname)


# TODO: move this to something like `test_ddl.py`
@pytest.mark.notyet(
["flink"],
reason="unclear whether Flink supports cross catalog/database inserts",
raises=Py4JJavaError,
)
def test_insert_with_database_specified(con_create_database):
con = con_create_database

t = ibis.memtable({"a": [1, 2, 3]})

with create_and_destroy_db(con) as dbname:
con.create_table(
table_name := gen_name("table"),
obj=t,
database=dbname,
temp=con.name == "flink",
)
try:
con.insert(table_name, obj=t, database=dbname)
assert con.table(table_name, database=dbname).count().to_pandas() == 6
finally:
con.drop_table(table_name, database=dbname)


@pytest.mark.notyet(["datafusion"], reason="cannot list or drop catalogs")
def test_create_catalog(con_create_catalog):
catalog = gen_name("test_create_catalog")
Expand Down

0 comments on commit 0e848d0

Please sign in to comment.