Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up materialised tables that Splink creates when we're not using linker.py (profiling) #2058

Merged
merged 6 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions splink/database_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,8 @@ def remove_splinkdataframe_from_cache(self, splink_dataframe: SplinkDataFrame):

for k in keys_to_delete:
del self._intermediate_table_cache[k]

def delete_tables_created_by_splink_from_db(self):
Copy link
Member Author

@RobinL RobinL Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved linker.py to the database_api because otherwise it isn't accessible to methods like profile_columns which are using the dbapi but not the linker

for splink_df in list(self._intermediate_table_cache.values()):
if splink_df.created_by_splink:
splink_df.drop_table_from_database_and_remove_from_cache()
17 changes: 12 additions & 5 deletions splink/duckdb/database_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,24 @@ def __init__(
"""
)

def delete_table_from_database(self, name: str):
# If the table is in fact a pandas dataframe that's been registered using
# duckdb con.register() then DROP TABLE will fail with
# Catalog Error: x is of type View
try:
drop_sql = f"DROP TABLE IF EXISTS {name}"
self._execute_sql_against_backend(drop_sql)
except duckdb.CatalogException:
drop_sql = f"DROP VIEW IF EXISTS {name}"
self._execute_sql_against_backend(drop_sql)

def _table_registration(self, input, table_name) -> None:
if isinstance(input, dict):
input = pd.DataFrame(input)
elif isinstance(input, list):
input = pd.DataFrame.from_records(input)

# Registration errors will automatically
Copy link
Member Author

@RobinL RobinL Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you remember the details of this comment - is it intended to distinguish between the previous behaviour of register, or just a general comment to point out we don't need any error checking logic.

I think you also get error checking when using register:

con = duckdb.connect()
con.register("registered_table", [{'a': 1}])
> InvalidInputException: Invalid Input Error: Python Object list not suitable to be registered as a view

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just a general comment - it is around in Splink 3 already, and seems to originate (with a slight expansion) from quite a while back

# occur if an invalid data type is passed as an argument
self._execute_sql_against_backend(
f"CREATE TABLE {table_name} AS SELECT * FROM input"
)
self._con.register(table_name, input)
Copy link
Member Author

@RobinL RobinL Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change has two effects:

  1. It reduces memory usage, see script below
  2. It means the table doesn't need to be cleaned up. The create table version creates a materialised table in the database. But since this is not a SplinkDataframe and is not registered in the intermediate table cache, it isn't cleaned up by delete_tables_created_by_splink_from_db() . That method iterates through SplinkDataFrames that Splink knows about and if 'created_by_splink' is set to True, allows them to be deleted.
Illustration of how register doesn't use memory but create table does
import os

import duckdb
import pandas as pd
import psutil


def get_size():
    process = psutil.Process(os.getpid())
    bytes = process.memory_info().rss  # in bytes
    factor = 1024
    for unit in ["", "K", "M", "G", "T", "P"]:
        if bytes < factor:
            return f"{bytes:.2f}{unit}B"
        bytes /= factor


print("Initial memory usage:", get_size())

df = pd.read_parquet(
    "/Users/robinlinacre/Documents/data_linking/ohid_docker/synthetic_1m.parquet"
)

print("Memory usage after loading DataFrame:", get_size())

con  = duckdb.connect()

con.register("registered_table", df)
print("Memory usage after registering DataFrame as a table in DuckDB:", get_size())

con.sql("CREATE TABLE my_table AS SELECT * FROM registered_table")

print("Memory usage after creating a new table from the registered table:", get_size())
Initial memory usage: 119.45MB
Memory usage after loading DataFrame: 865.04MB
Memory usage after registering DataFrame as a table in DuckDB: 867.31MB
Memory usage after creating a new table from the registered table: 1000.46MB

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of what this does, see here

DuckDB also supports “registering” a DataFrame or Arrow object as a virtual table, comparable to a SQL VIEW. This is useful when querying a DataFrame/Arrow object that is stored in another way (as a class variable, or a value in a dictionary). Below is a Pandas example

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense. I originally switched to using tables when I implemented the DatabaseAPI as at that time it was difficult getting things working with some things being tables and some being views (see note under 'DuckDB registration'), and that was easiest for time being, but happy to go for the lighter approach if that works okay

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah looks like there is still some sort of clash with views/tables. but yeah if that can be resolved i'm all in favour of the reduced-memory option


def table_to_splink_dataframe(
self, templated_name, physical_name
Expand Down
4 changes: 1 addition & 3 deletions splink/linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -806,9 +806,7 @@ def _populate_m_u_from_trained_values(self):
cl.m_probability = cl._trained_m_median

def delete_tables_created_by_splink_from_db(self):
for splink_df in list(self._intermediate_table_cache.values()):
if splink_df.created_by_splink:
splink_df.drop_table_from_database_and_remove_from_cache()
self.db_api.delete_tables_created_by_splink_from_db()

def _raise_error_if_necessary_waterfall_columns_not_computed(self):
ricc = self._settings_obj._retain_intermediate_calculation_columns
Expand Down
2 changes: 2 additions & 0 deletions splink/profile_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ def profile_columns(
)
inner_charts.append(inner_chart)

db_api.delete_tables_created_by_splink_from_db()

if inner_charts != []:
outer_spec = deepcopy(_outer_chart_spec_freq)
outer_spec["vconcat"] = inner_charts
Expand Down
Loading