From 019cae5d8567477b7be38942069f66b6ce87805a Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Tue, 10 Sep 2024 12:29:35 -0400 Subject: [PATCH] refactor(backends): clean up resources produced by `memtable` (#10055) --- ibis/backends/__init__.py | 15 +++++++++++ ibis/backends/bigquery/__init__.py | 11 +++++++++ ibis/backends/duckdb/__init__.py | 13 ++++------ ibis/backends/exasol/__init__.py | 29 +++++++++------------- ibis/backends/mssql/__init__.py | 19 ++++++-------- ibis/backends/mysql/__init__.py | 19 +++++++++++--- ibis/backends/oracle/__init__.py | 23 ++++++++--------- ibis/backends/pandas/__init__.py | 3 +++ ibis/backends/polars/__init__.py | 3 +++ ibis/backends/postgres/__init__.py | 12 ++++++--- ibis/backends/pyspark/__init__.py | 7 ------ ibis/backends/risingwave/__init__.py | 18 ++++++-------- ibis/backends/snowflake/__init__.py | 7 ------ ibis/backends/sql/__init__.py | 3 +++ ibis/backends/sqlite/__init__.py | 6 ----- ibis/backends/tests/test_client.py | 37 ++++++++++++++++++++++++++++ ibis/backends/trino/__init__.py | 5 ---- 17 files changed, 138 insertions(+), 92 deletions(-) diff --git a/ibis/backends/__init__.py b/ibis/backends/__init__.py index 7db4ba06f4ae..350937b5938b 100644 --- a/ibis/backends/__init__.py +++ b/ibis/backends/__init__.py @@ -2,6 +2,7 @@ import abc import collections.abc +import contextlib import functools import importlib.metadata import keyword @@ -1116,6 +1117,9 @@ def _register_in_memory_tables(self, expr: ir.Expr) -> None: for memtable in expr.op().find(ops.InMemoryTable): if not self._in_memory_table_exists(memtable.name): self._register_in_memory_table(memtable) + weakref.finalize( + memtable, self._finalize_in_memory_table, memtable.name + ) def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: if self.supports_in_memory_tables: @@ -1123,6 +1127,17 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: f"{self.name} must implement `_register_in_memory_table` to support in-memory tables" ) + def _finalize_in_memory_table(self, name: str) -> None: + """Wrap `_finalize_memtable` to suppress exceptions.""" + with contextlib.suppress(Exception): + self._finalize_memtable(name) + + def _finalize_memtable(self, name: str) -> None: + if self.supports_in_memory_tables: + raise NotImplementedError( + f"{self.name} must implement `_finalize_memtable` to support in-memory tables" + ) + def _run_pre_execute_hooks(self, expr: ir.Expr) -> None: """Backend-specific hooks to run before an expression is executed.""" self._register_udfs(expr) diff --git a/ibis/backends/bigquery/__init__.py b/ibis/backends/bigquery/__init__.py index fd81a778b9fa..22f6b5e7e479 100644 --- a/ibis/backends/bigquery/__init__.py +++ b/ibis/backends/bigquery/__init__.py @@ -180,6 +180,17 @@ def _in_memory_table_exists(self, name: str) -> bool: else: return True + def _finalize_memtable(self, name: str) -> None: + session_dataset = self._session_dataset + table_id = sg.table( + name, + db=session_dataset.dataset_id, + catalog=session_dataset.project, + quoted=False, + ) + drop_sql_stmt = sge.Drop(kind="TABLE", this=table_id, exists=True) + self.raw_sql(drop_sql_stmt) + def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: session_dataset = self._session_dataset diff --git a/ibis/backends/duckdb/__init__.py b/ibis/backends/duckdb/__init__.py index 73bc9629cdce..7dc8bed0f835 100644 --- a/ibis/backends/duckdb/__init__.py +++ b/ibis/backends/duckdb/__init__.py @@ -7,7 +7,6 @@ import os import urllib import warnings -import weakref from operator import itemgetter from pathlib import Path from typing import TYPE_CHECKING, Any @@ -160,12 +159,9 @@ def create_table( properties.append(sge.TemporaryProperty()) catalog = "temp" - temp_memtable_view = None - if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) - temp_memtable_view = table.op().name else: table = obj @@ -234,9 +230,6 @@ def create_table( ).sql(self.name) ) - if temp_memtable_view is not None: - self.con.unregister(temp_memtable_view) - return self.table(name, database=(catalog, database)) def table( @@ -1620,11 +1613,15 @@ def _in_memory_table_exists(self, name: str) -> bool: def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: self.con.register(op.name, op.data.to_pyarrow(op.schema)) + def _finalize_memtable(self, name: str) -> None: # if we don't aggressively unregister tables duckdb will keep a # reference to every memtable ever registered, even if there's no # way for a user to access the operation anymore, resulting in a # memory leak - weakref.finalize(op, self.con.unregister, op.name) + # + # we can't use drop_table, because self.con.register creates a view, so + # use the corresponding unregister method + self.con.unregister(name) def _register_udfs(self, expr: ir.Expr) -> None: con = self.con diff --git a/ibis/backends/exasol/__init__.py b/ibis/backends/exasol/__init__.py index c31d19b334db..05456db385f5 100644 --- a/ibis/backends/exasol/__init__.py +++ b/ibis/backends/exasol/__init__.py @@ -1,6 +1,5 @@ from __future__ import annotations -import atexit import contextlib import datetime import re @@ -42,7 +41,6 @@ class Backend(SQLBackend, CanCreateDatabase, CanCreateSchema): compiler = sc.exasol.compiler supports_temporary_tables = False supports_create_or_replace = False - supports_in_memory_tables = False supports_python_udfs = False @property @@ -278,14 +276,15 @@ def process_item(item: Any): with self._safe_raw_sql(create_stmt_sql): if not df.empty: self.con.ext.insert_multi(name, rows) - atexit.register(self._clean_up_tmp_table, ident) - def _clean_up_tmp_table(self, ident: sge.Identifier) -> None: - with self._safe_raw_sql( - sge.Drop(kind="TABLE", this=ident, exists=True, cascade=True) - ): + def _clean_up_tmp_table(self, name: str) -> None: + ident = sg.to_identifier(name, quoted=self.compiler.quoted) + sql = sge.Drop(kind="TABLE", this=ident, exists=True, cascade=True) + with self._safe_raw_sql(sql): pass + _finalize_memtable = _clean_up_tmp_table + def create_table( self, name: str, @@ -334,11 +333,9 @@ def create_table( quoted = self.compiler.quoted - temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) - temp_memtable_view = table.op().name else: table = obj @@ -356,8 +353,10 @@ def create_table( if not schema: schema = table.schema() - table = sg.table(temp_name, catalog=database, quoted=quoted) - target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect)) + table_expr = sg.table(temp_name, catalog=database, quoted=quoted) + target = sge.Schema( + this=table_expr, expressions=schema.to_sqlglot(self.dialect) + ) create_stmt = sge.Create(kind="TABLE", this=target) @@ -365,7 +364,7 @@ def create_table( with self._safe_raw_sql(create_stmt): if query is not None: self.con.execute( - sge.Insert(this=table, expression=query).sql(self.name) + sge.Insert(this=table_expr, expression=query).sql(self.name) ) if overwrite: @@ -373,14 +372,10 @@ def create_table( sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name) ) self.con.execute( - f"RENAME TABLE {table.sql(self.name)} TO {this.sql(self.name)}" + f"RENAME TABLE {table_expr.sql(self.name)} TO {this.sql(self.name)}" ) if schema is None: - # Clean up temporary memtable if we've created one - # for in-memory reads - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) return self.table(name, database=database) # preserve the input schema if it was provided diff --git a/ibis/backends/mssql/__init__.py b/ibis/backends/mssql/__init__.py index e42fd6a0d746..e20624e1cd07 100644 --- a/ibis/backends/mssql/__init__.py +++ b/ibis/backends/mssql/__init__.py @@ -625,11 +625,9 @@ def create_table( properties.append(sge.TemporaryProperty()) catalog, db = None, None - temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) - temp_memtable_view = table.op().name else: table = obj @@ -647,11 +645,14 @@ def create_table( if not schema: schema = table.schema() - table = sg.table( - "#" * temp + temp_name, catalog=catalog, db=db, quoted=self.compiler.quoted - ) + quoted = self.compiler.quoted raw_table = sg.table(temp_name, catalog=catalog, db=db, quoted=False) - target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect)) + target = sge.Schema( + this=sg.table( + "#" * temp + temp_name, catalog=catalog, db=db, quoted=quoted + ), + expressions=schema.to_sqlglot(self.dialect), + ) create_stmt = sge.Create( kind="TABLE", @@ -659,7 +660,7 @@ def create_table( properties=sge.Properties(expressions=properties), ) - this = sg.table(name, catalog=catalog, db=db, quoted=self.compiler.quoted) + this = sg.table(name, catalog=catalog, db=db, quoted=quoted) raw_this = sg.table(name, catalog=catalog, db=db, quoted=False) with self._safe_ddl(create_stmt) as cur: if query is not None: @@ -692,10 +693,6 @@ def create_table( db = "dbo" if schema is None: - # Clean up temporary memtable if we've created one - # for in-memory reads - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) return self.table(name, database=(catalog, db)) # preserve the input schema if it was provided diff --git a/ibis/backends/mysql/__init__.py b/ibis/backends/mysql/__init__.py index 9600f380065e..eec4df9ac634 100644 --- a/ibis/backends/mysql/__init__.py +++ b/ibis/backends/mysql/__init__.py @@ -425,8 +425,10 @@ def create_table( if not schema: schema = table.schema() - table = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted) - target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect)) + table_expr = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted) + target = sge.Schema( + this=table_expr, expressions=schema.to_sqlglot(self.dialect) + ) create_stmt = sge.Create( kind="TABLE", @@ -437,7 +439,9 @@ def create_table( this = sg.table(name, catalog=database, quoted=self.compiler.quoted) with self._safe_raw_sql(create_stmt) as cur: if query is not None: - insert_stmt = sge.Insert(this=table, expression=query).sql(self.name) + insert_stmt = sge.Insert(this=table_expr, expression=query).sql( + self.name + ) cur.execute(insert_stmt) if overwrite: @@ -445,7 +449,7 @@ def create_table( sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name) ) cur.execute( - f"ALTER TABLE IF EXISTS {table.sql(self.name)} RENAME TO {this.sql(self.name)}" + f"ALTER TABLE IF EXISTS {table_expr.sql(self.name)} RENAME TO {this.sql(self.name)}" ) if schema is None: @@ -538,3 +542,10 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: raise df = MySQLPandasData.convert_table(df, schema) return df + + def _finalize_memtable(self, name: str) -> None: + """No-op. + + Executing **any** SQL in a finalizer causes the underlying connection + socket to be set to `None`. It is unclear why this happens. + """ diff --git a/ibis/backends/oracle/__init__.py b/ibis/backends/oracle/__init__.py index 8cbfbfa372d3..51d9427a2a6c 100644 --- a/ibis/backends/oracle/__init__.py +++ b/ibis/backends/oracle/__init__.py @@ -2,7 +2,6 @@ from __future__ import annotations -import atexit import contextlib import re import warnings @@ -419,11 +418,9 @@ def create_table( if temp: properties.append(sge.TemporaryProperty()) - temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) - temp_memtable_view = table.op().name else: table = obj @@ -468,10 +465,6 @@ def create_table( ) if schema is None: - # Clean up temporary memtable if we've created one - # for in-memory reads - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) return self.table(name, database=database) # preserve the input schema if it was provided @@ -527,8 +520,6 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: insert_stmt, list(data.iloc[start:end].itertuples(index=False)) ) - atexit.register(self._clean_up_tmp_table, name) - def _get_schema_using_query(self, query: str) -> sch.Schema: name = util.gen_name("oracle_metadata") dialect = self.name @@ -608,6 +599,13 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: return OraclePandasData.convert_table(df, schema) def _clean_up_tmp_table(self, name: str) -> None: + dialect = self.dialect + + ident = sg.to_identifier(name, quoted=self.compiler.quoted) + + truncate = sge.TruncateTable(expressions=[ident]).sql(dialect) + drop = sge.Drop(kind="TABLE", this=ident).sql(dialect) + with self.begin() as bind: # global temporary tables cannot be dropped without first truncating them # @@ -616,9 +614,8 @@ def _clean_up_tmp_table(self, name: str) -> None: # ignore DatabaseError exceptions because the table may not exist # because it's already been deleted with contextlib.suppress(oracledb.DatabaseError): - bind.execute(f'TRUNCATE TABLE "{name}"') + bind.execute(truncate) with contextlib.suppress(oracledb.DatabaseError): - bind.execute(f'DROP TABLE "{name}"') + bind.execute(drop) - def _drop_cached_table(self, name): - self._clean_up_tmp_table(name) + _finalize_memtable = _drop_cached_table = _clean_up_tmp_table diff --git a/ibis/backends/pandas/__init__.py b/ibis/backends/pandas/__init__.py index b26a6e7ead9f..5404e744ed01 100644 --- a/ibis/backends/pandas/__init__.py +++ b/ibis/backends/pandas/__init__.py @@ -331,6 +331,9 @@ def execute(self, query, params=None, limit="default", **kwargs): def _create_cached_table(self, name, expr): return self.create_table(name, expr.execute()) + def _finalize_memtable(self, name: str) -> None: + """No-op, let Python handle clean up.""" + @lazy_singledispatch def _convert_object(obj: Any, _conn): diff --git a/ibis/backends/polars/__init__.py b/ibis/backends/polars/__init__.py index 8afabfc79b3d..8fe8df3debed 100644 --- a/ibis/backends/polars/__init__.py +++ b/ibis/backends/polars/__init__.py @@ -81,6 +81,9 @@ def _in_memory_table_exists(self, name: str) -> bool: def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: self._add_table(op.name, op.data.to_polars(op.schema).lazy()) + def _finalize_memtable(self, name: str) -> None: + self.drop_table(name, force=True) + @deprecated( as_of="9.1", instead="use the explicit `read_*` method for the filetype you are trying to read, e.g., read_parquet, read_csv, etc.", diff --git a/ibis/backends/postgres/__init__.py b/ibis/backends/postgres/__init__.py index c36d3874f550..fcfa517a8aff 100644 --- a/ibis/backends/postgres/__init__.py +++ b/ibis/backends/postgres/__init__.py @@ -663,8 +663,10 @@ def create_table( if not schema: schema = table.schema() - table = sg.table(temp_name, db=database, quoted=self.compiler.quoted) - target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect)) + table_expr = sg.table(temp_name, db=database, quoted=self.compiler.quoted) + target = sge.Schema( + this=table_expr, expressions=schema.to_sqlglot(self.dialect) + ) create_stmt = sge.Create( kind="TABLE", @@ -675,7 +677,9 @@ def create_table( this = sg.table(name, catalog=database, quoted=self.compiler.quoted) with self._safe_raw_sql(create_stmt) as cur: if query is not None: - insert_stmt = sge.Insert(this=table, expression=query).sql(self.dialect) + insert_stmt = sge.Insert(this=table_expr, expression=query).sql( + self.dialect + ) cur.execute(insert_stmt) if overwrite: @@ -683,7 +687,7 @@ def create_table( sge.Drop(kind="TABLE", this=this, exists=True).sql(self.dialect) ) cur.execute( - f"ALTER TABLE IF EXISTS {table.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}" + f"ALTER TABLE IF EXISTS {table_expr.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}" ) if schema is None: diff --git a/ibis/backends/pyspark/__init__.py b/ibis/backends/pyspark/__init__.py index 9899f220702f..def05da78f82 100644 --- a/ibis/backends/pyspark/__init__.py +++ b/ibis/backends/pyspark/__init__.py @@ -594,13 +594,11 @@ def create_table( table_loc = self._to_sqlglot_table(database) catalog, db = self._to_catalog_db_tuple(table_loc) - temp_memtable_view = None if obj is not None: if isinstance(obj, ir.Expr): table = obj else: table = ibis.memtable(obj) - temp_memtable_view = table.op().name query = self.compile(table) mode = "overwrite" if overwrite else "error" with self._active_catalog_database(catalog, db): @@ -615,11 +613,6 @@ def create_table( else: raise com.IbisError("The schema or obj parameter is required") - # Clean up temporary memtable if we've created one - # for in-memory reads - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) - return self.table(name, database=(catalog, db)) def create_view( diff --git a/ibis/backends/risingwave/__init__.py b/ibis/backends/risingwave/__init__.py index 2ccbe3b7a399..e824d93d93a3 100644 --- a/ibis/backends/risingwave/__init__.py +++ b/ibis/backends/risingwave/__init__.py @@ -195,11 +195,9 @@ def create_table( f"Creating temp tables is not supported by {self.name}" ) - temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) - temp_memtable_view = table.op().name else: table = obj @@ -217,8 +215,10 @@ def create_table( if not schema: schema = table.schema() - table = sg.table(temp_name, db=database, quoted=self.compiler.quoted) - target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect)) + table_expr = sg.table(temp_name, db=database, quoted=self.compiler.quoted) + target = sge.Schema( + this=table_expr, expressions=schema.to_sqlglot(self.dialect) + ) if connector_properties is None: create_stmt = sge.Create( @@ -241,20 +241,18 @@ def create_table( this = sg.table(name, db=database, quoted=self.compiler.quoted) with self._safe_raw_sql(create_stmt) as cur: if query is not None: - insert_stmt = sge.Insert(this=table, expression=query).sql(self.dialect) + insert_stmt = sge.Insert(this=table_expr, expression=query).sql( + self.dialect + ) cur.execute(insert_stmt) if overwrite: self.drop_table(name, database=database, force=True) cur.execute( - f"ALTER TABLE {table.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}" + f"ALTER TABLE {table_expr.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}" ) if schema is None: - # Clean up temporary memtable if we've created one - # for in-memory reads - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) return self.table(name, database=database) # preserve the input schema if it was provided diff --git a/ibis/backends/snowflake/__init__.py b/ibis/backends/snowflake/__init__.py index 91dfb8221297..5eb378186414 100644 --- a/ibis/backends/snowflake/__init__.py +++ b/ibis/backends/snowflake/__init__.py @@ -821,11 +821,9 @@ def create_table( if comment is not None: properties.append(sge.SchemaCommentProperty(this=sge.convert(comment))) - temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) - temp_memtable_view = table.op().name else: table = obj @@ -846,11 +844,6 @@ def create_table( with self._safe_raw_sql(create_stmt): pass - # Clean up temporary memtable if we've created one - # for in-memory reads - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) - return self.table(name, database=(catalog, db)) def read_csv( diff --git a/ibis/backends/sql/__init__.py b/ibis/backends/sql/__init__.py index 1b7896fb794d..2d64efe6076b 100644 --- a/ibis/backends/sql/__init__.py +++ b/ibis/backends/sql/__init__.py @@ -618,3 +618,6 @@ def _register_pandas_udf(self, udf_node: ops.ScalarUDF) -> str: raise NotImplementedError( f"pandas UDFs are not supported in the {self.dialect} backend" ) + + def _finalize_memtable(self, name: str) -> None: + self.drop_table(name, force=True) diff --git a/ibis/backends/sqlite/__init__.py b/ibis/backends/sqlite/__init__.py index 042c8168519a..c52c654486ee 100644 --- a/ibis/backends/sqlite/__init__.py +++ b/ibis/backends/sqlite/__init__.py @@ -456,11 +456,9 @@ def create_table( if schema is not None: schema = ibis.schema(schema) - temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): obj = ibis.memtable(obj) - temp_memtable_view = obj.op().name self._run_pre_execute_hooks(obj) @@ -516,10 +514,6 @@ def create_table( ) if schema is None: - # Clean up temporary memtable if we've created one - # for in-memory reads - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) return self.table(name, database=database) # preserve the input schema if it was provided diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index c38a26f0241e..2d80b56c4b4a 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -1767,3 +1767,40 @@ def test_insert_into_table_missing_columns(con, temp_table): expected_result = {"a": [1], "b": [1]} assert result == expected_result + + +@pytest.mark.never( + ["pandas", "dask"], raises=AssertionError, reason="backend is going away" +) +@pytest.mark.notyet(["druid"], raises=AssertionError, reason="can't drop tables") +@pytest.mark.notyet( + ["clickhouse", "flink"], + raises=AssertionError, + reason="memtables are assembled every time", +) +@pytest.mark.notyet( + ["mysql"], + raises=AssertionError, + reason="can't execute SQL inside of a finalizer without breaking everything", +) +def test_memtable_cleanup(con): + name = ibis.util.gen_name("temp_memtable") + t = ibis.memtable({"a": [1, 2, 3], "b": list("def")}, name=name) + + # the table isn't registered until we actually execute, and since we + # haven't yet executed anything, the table shouldn't be there + assert not con._in_memory_table_exists(name) + + # execute, which means the table is registered and should be visible in + # con.list_tables() + con.execute(t.select("a")) + assert con._in_memory_table_exists(name) + + con.execute(t.select("b")) + assert con._in_memory_table_exists(name) + + # remove all references to `t`, which means the `op` shouldn't be reachable + # and the table should thus be dropped and no longer visible in + # con.list_tables() + del t + assert not con._in_memory_table_exists(name) diff --git a/ibis/backends/trino/__init__.py b/ibis/backends/trino/__init__.py index da70f42f40de..7c4ab32ec77b 100644 --- a/ibis/backends/trino/__init__.py +++ b/ibis/backends/trino/__init__.py @@ -483,13 +483,11 @@ def create_table( if comment: property_list.append(sge.SchemaCommentProperty(this=sge.convert(comment))) - temp_memtable_view = None if obj is not None: if isinstance(obj, ir.Table): table = obj else: table = ibis.memtable(obj, schema=schema) - temp_memtable_view = table.op().name self._run_pre_execute_hooks(table) @@ -533,9 +531,6 @@ def create_table( ).sql(self.name) ) - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) - return self.table(orig_table_ref.name, database=(catalog, db)) def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: