From 7acbdfa097a30b499a4becfcd21695be883b187d Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Thu, 14 Mar 2024 20:44:48 +0000 Subject: [PATCH 1/7] initial tests --- splink/linker.py | 47 +++++++++++++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/splink/linker.py b/splink/linker.py index eb6f6f77ca..07380df11b 100644 --- a/splink/linker.py +++ b/splink/linker.py @@ -535,6 +535,17 @@ def _initialise_df_concat_with_tf(self, materialise=True): return nodes_with_tf + def _enqueue_df_concat_with_tf(self, pipeline: SQLPipeline): + + sql = vertically_concatenate_sql(self) + pipeline.enqueue_sql(sql, "__splink__df_concat") + + sqls = compute_all_term_frequencies_sqls(self) + for sql in sqls: + pipeline.enqueue_sql(sql["sql"], sql["output_table_name"]) + + return pipeline + def _table_to_splink_dataframe( self, templated_name, physical_name ) -> SplinkDataFrame: @@ -1283,19 +1294,23 @@ def predict( """ + pipeline = SQLPipeline() + # If materialise_after_computing_term_frequencies=False and the user only # calls predict, it runs as a single pipeline with no materialisation # of anything. - # _initialise_df_concat_with_tf returns None if the table doesn't exist - # and only SQL is queued in this step. - nodes_with_tf = self._initialise_df_concat_with_tf( - materialise=materialise_after_computing_term_frequencies - ) + self._enqueue_df_concat_with_tf(pipeline) + + # In duckdb, calls to random() in a CTE pipeline cause problems: + # https://gist.github.com/RobinL/d329e7004998503ce91b68479aa41139 + if self._settings_obj.salting_required: + materialise = True input_dataframes = [] - if nodes_with_tf: - input_dataframes.append(nodes_with_tf) + if materialise_after_computing_term_frequencies: + nodes_with_tf = self.db_api.sql_pipeline_to_splink_dataframe(pipeline) + pipeline = SQLPipeline() # If exploded blocking rules exist, we need to materialise # the tables of ID pairs @@ -1715,9 +1730,9 @@ def _compute_metrics_nodes( df_node_metrics = self._execute_sql_pipeline() - df_node_metrics.metadata[ - "threshold_match_probability" - ] = threshold_match_probability + df_node_metrics.metadata["threshold_match_probability"] = ( + threshold_match_probability + ) return df_node_metrics def _compute_metrics_edges( @@ -1752,9 +1767,9 @@ def _compute_metrics_edges( df_edge_metrics = compute_edge_metrics( self, df_node_metrics, df_predict, df_clustered, threshold_match_probability ) - df_edge_metrics.metadata[ - "threshold_match_probability" - ] = threshold_match_probability + df_edge_metrics.metadata["threshold_match_probability"] = ( + threshold_match_probability + ) return df_edge_metrics def _compute_metrics_clusters( @@ -1794,9 +1809,9 @@ def _compute_metrics_clusters( self._enqueue_sql(sql["sql"], sql["output_table_name"]) df_cluster_metrics = self._execute_sql_pipeline() - df_cluster_metrics.metadata[ - "threshold_match_probability" - ] = df_node_metrics.metadata["threshold_match_probability"] + df_cluster_metrics.metadata["threshold_match_probability"] = ( + df_node_metrics.metadata["threshold_match_probability"] + ) return df_cluster_metrics def compute_graph_metrics( From 38ff770647f251d15f261dabaadff11ec766a281 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Fri, 15 Mar 2024 09:00:15 +0000 Subject: [PATCH 2/7] use fresh pipeline for predict --- splink/linker.py | 19 +++++++++++-------- splink/pipeline.py | 5 +++++ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/splink/linker.py b/splink/linker.py index 07380df11b..a5ce33f10b 100644 --- a/splink/linker.py +++ b/splink/linker.py @@ -1305,11 +1305,12 @@ def predict( # In duckdb, calls to random() in a CTE pipeline cause problems: # https://gist.github.com/RobinL/d329e7004998503ce91b68479aa41139 if self._settings_obj.salting_required: - materialise = True + materialise_after_computing_term_frequencies = True input_dataframes = [] if materialise_after_computing_term_frequencies: nodes_with_tf = self.db_api.sql_pipeline_to_splink_dataframe(pipeline) + input_dataframes.append(nodes_with_tf) pipeline = SQLPipeline() # If exploded blocking rules exist, we need to materialise @@ -1317,20 +1318,21 @@ def predict( exploding_br_with_id_tables = materialise_exploded_id_tables(self) sqls = block_using_rules_sqls(self) - for sql in sqls: - self._enqueue_sql(sql["sql"], sql["output_table_name"]) + pipeline.enqueue_list_of_sqls(sqls) repartition_after_blocking = getattr(self, "repartition_after_blocking", False) # repartition after blocking only exists on the SparkLinker if repartition_after_blocking: - df_blocked = self._execute_sql_pipeline(input_dataframes) + df_blocked = self.db_api.sql_pipeline_to_splink_dataframe( + pipeline, input_dataframes + ) input_dataframes.append(df_blocked) sql = compute_comparison_vector_values_sql( self._settings_obj._columns_to_select_for_comparison_vector_values ) - self._enqueue_sql(sql, "__splink__df_comparison_vectors") + pipeline.enqueue_sql(sql, "__splink__df_comparison_vectors") sqls = predict_from_comparison_vectors_sqls_using_settings( self._settings_obj, @@ -1338,10 +1340,11 @@ def predict( threshold_match_weight, sql_infinity_expression=self._infinity_expression, ) - for sql in sqls: - self._enqueue_sql(sql["sql"], sql["output_table_name"]) + pipeline.enqueue_list_of_sqls(sqls) - predictions = self._execute_sql_pipeline(input_dataframes) + predictions = self.db_api.sql_pipeline_to_splink_dataframe( + pipeline, input_dataframes + ) self._predict_warning() [b.drop_materialised_id_pairs_dataframe() for b in exploding_br_with_id_tables] diff --git a/splink/pipeline.py b/splink/pipeline.py index 3a5d418ed9..40cd70bc89 100644 --- a/splink/pipeline.py +++ b/splink/pipeline.py @@ -1,5 +1,6 @@ import logging from copy import deepcopy +from typing import List import sqlglot from sqlglot.errors import ParseError @@ -45,6 +46,10 @@ def enqueue_sql(self, sql, output_table_name): sql_task = SQLTask(sql, output_table_name) self.queue.append(sql_task) + def enqueue_list_of_sqls(self, sql_list: List[dict]): + for sql_dict in sql_list: + self.enqueue_sql(sql_dict["sql"], sql_dict["output_table_name"]) + def generate_pipeline_parts(self, input_dataframes): parts = deepcopy(self.queue) for df in input_dataframes: From da158f00dc7ebda26e159c494b8ae710170b869d Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Fri, 15 Mar 2024 09:02:50 +0000 Subject: [PATCH 3/7] lint with black --- splink/linker.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/splink/linker.py b/splink/linker.py index a5ce33f10b..b385b8adbc 100644 --- a/splink/linker.py +++ b/splink/linker.py @@ -1733,9 +1733,9 @@ def _compute_metrics_nodes( df_node_metrics = self._execute_sql_pipeline() - df_node_metrics.metadata["threshold_match_probability"] = ( - threshold_match_probability - ) + df_node_metrics.metadata[ + "threshold_match_probability" + ] = threshold_match_probability return df_node_metrics def _compute_metrics_edges( @@ -1770,9 +1770,9 @@ def _compute_metrics_edges( df_edge_metrics = compute_edge_metrics( self, df_node_metrics, df_predict, df_clustered, threshold_match_probability ) - df_edge_metrics.metadata["threshold_match_probability"] = ( - threshold_match_probability - ) + df_edge_metrics.metadata[ + "threshold_match_probability" + ] = threshold_match_probability return df_edge_metrics def _compute_metrics_clusters( @@ -1812,9 +1812,9 @@ def _compute_metrics_clusters( self._enqueue_sql(sql["sql"], sql["output_table_name"]) df_cluster_metrics = self._execute_sql_pipeline() - df_cluster_metrics.metadata["threshold_match_probability"] = ( - df_node_metrics.metadata["threshold_match_probability"] - ) + df_cluster_metrics.metadata[ + "threshold_match_probability" + ] = df_node_metrics.metadata["threshold_match_probability"] return df_cluster_metrics def compute_graph_metrics( From c30d0d50997e39efbb10f0d2bed52e47ae950059 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Sat, 16 Mar 2024 14:50:24 +0000 Subject: [PATCH 4/7] update deterministic link to use fresh pipeline --- splink/linker.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/splink/linker.py b/splink/linker.py index b385b8adbc..64bccb5961 100644 --- a/splink/linker.py +++ b/splink/linker.py @@ -954,19 +954,24 @@ def deterministic_link(self) -> SplinkDataFrame: SplinkDataFrame allow you to access the underlying data. """ + pipeline = SQLPipeline() + # Allows clustering during a deterministic linkage. # This is used in `cluster_pairwise_predictions_at_threshold` # to set the cluster threshold to 1 self._deterministic_link_mode = True - concat_with_tf = self._initialise_df_concat_with_tf() + self._enqueue_df_concat_with_tf(pipeline) + concat_with_tf = self.db_api.sql_pipeline_to_splink_dataframe(pipeline) + exploding_br_with_id_tables = materialise_exploded_id_tables(self) sqls = block_using_rules_sqls(self) - for sql in sqls: - self._enqueue_sql(sql["sql"], sql["output_table_name"]) + pipeline.enqueue_list_of_sqls(sqls) - deterministic_link_df = self._execute_sql_pipeline([concat_with_tf]) + deterministic_link_df = predictions = ( + self.db_api.sql_pipeline_to_splink_dataframe(pipeline, [concat_with_tf]) + ) [b.drop_materialised_id_pairs_dataframe() for b in exploding_br_with_id_tables] return deterministic_link_df @@ -1733,9 +1738,9 @@ def _compute_metrics_nodes( df_node_metrics = self._execute_sql_pipeline() - df_node_metrics.metadata[ - "threshold_match_probability" - ] = threshold_match_probability + df_node_metrics.metadata["threshold_match_probability"] = ( + threshold_match_probability + ) return df_node_metrics def _compute_metrics_edges( @@ -1770,9 +1775,9 @@ def _compute_metrics_edges( df_edge_metrics = compute_edge_metrics( self, df_node_metrics, df_predict, df_clustered, threshold_match_probability ) - df_edge_metrics.metadata[ - "threshold_match_probability" - ] = threshold_match_probability + df_edge_metrics.metadata["threshold_match_probability"] = ( + threshold_match_probability + ) return df_edge_metrics def _compute_metrics_clusters( @@ -1812,9 +1817,9 @@ def _compute_metrics_clusters( self._enqueue_sql(sql["sql"], sql["output_table_name"]) df_cluster_metrics = self._execute_sql_pipeline() - df_cluster_metrics.metadata[ - "threshold_match_probability" - ] = df_node_metrics.metadata["threshold_match_probability"] + df_cluster_metrics.metadata["threshold_match_probability"] = ( + df_node_metrics.metadata["threshold_match_probability"] + ) return df_cluster_metrics def compute_graph_metrics( From 7f93d633db8f0d66c0c3062bb42946618684ee59 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Sat, 16 Mar 2024 14:51:28 +0000 Subject: [PATCH 5/7] lint with black --- splink/linker.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/splink/linker.py b/splink/linker.py index 64bccb5961..9089b2613f 100644 --- a/splink/linker.py +++ b/splink/linker.py @@ -969,9 +969,9 @@ def deterministic_link(self) -> SplinkDataFrame: sqls = block_using_rules_sqls(self) pipeline.enqueue_list_of_sqls(sqls) - deterministic_link_df = predictions = ( - self.db_api.sql_pipeline_to_splink_dataframe(pipeline, [concat_with_tf]) - ) + deterministic_link_df = ( + predictions + ) = self.db_api.sql_pipeline_to_splink_dataframe(pipeline, [concat_with_tf]) [b.drop_materialised_id_pairs_dataframe() for b in exploding_br_with_id_tables] return deterministic_link_df @@ -1738,9 +1738,9 @@ def _compute_metrics_nodes( df_node_metrics = self._execute_sql_pipeline() - df_node_metrics.metadata["threshold_match_probability"] = ( - threshold_match_probability - ) + df_node_metrics.metadata[ + "threshold_match_probability" + ] = threshold_match_probability return df_node_metrics def _compute_metrics_edges( @@ -1775,9 +1775,9 @@ def _compute_metrics_edges( df_edge_metrics = compute_edge_metrics( self, df_node_metrics, df_predict, df_clustered, threshold_match_probability ) - df_edge_metrics.metadata["threshold_match_probability"] = ( - threshold_match_probability - ) + df_edge_metrics.metadata[ + "threshold_match_probability" + ] = threshold_match_probability return df_edge_metrics def _compute_metrics_clusters( @@ -1817,9 +1817,9 @@ def _compute_metrics_clusters( self._enqueue_sql(sql["sql"], sql["output_table_name"]) df_cluster_metrics = self._execute_sql_pipeline() - df_cluster_metrics.metadata["threshold_match_probability"] = ( - df_node_metrics.metadata["threshold_match_probability"] - ) + df_cluster_metrics.metadata[ + "threshold_match_probability" + ] = df_node_metrics.metadata["threshold_match_probability"] return df_cluster_metrics def compute_graph_metrics( From 480d79f52957e110a33409628e0db639587f2587 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Sat, 16 Mar 2024 14:56:19 +0000 Subject: [PATCH 6/7] fix error --- splink/linker.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/splink/linker.py b/splink/linker.py index 9089b2613f..49040f87c2 100644 --- a/splink/linker.py +++ b/splink/linker.py @@ -969,9 +969,9 @@ def deterministic_link(self) -> SplinkDataFrame: sqls = block_using_rules_sqls(self) pipeline.enqueue_list_of_sqls(sqls) - deterministic_link_df = ( - predictions - ) = self.db_api.sql_pipeline_to_splink_dataframe(pipeline, [concat_with_tf]) + deterministic_link_df = self.db_api.sql_pipeline_to_splink_dataframe( + pipeline, [concat_with_tf] + ) [b.drop_materialised_id_pairs_dataframe() for b in exploding_br_with_id_tables] return deterministic_link_df @@ -1738,9 +1738,9 @@ def _compute_metrics_nodes( df_node_metrics = self._execute_sql_pipeline() - df_node_metrics.metadata[ - "threshold_match_probability" - ] = threshold_match_probability + df_node_metrics.metadata["threshold_match_probability"] = ( + threshold_match_probability + ) return df_node_metrics def _compute_metrics_edges( @@ -1775,9 +1775,9 @@ def _compute_metrics_edges( df_edge_metrics = compute_edge_metrics( self, df_node_metrics, df_predict, df_clustered, threshold_match_probability ) - df_edge_metrics.metadata[ - "threshold_match_probability" - ] = threshold_match_probability + df_edge_metrics.metadata["threshold_match_probability"] = ( + threshold_match_probability + ) return df_edge_metrics def _compute_metrics_clusters( @@ -1817,9 +1817,9 @@ def _compute_metrics_clusters( self._enqueue_sql(sql["sql"], sql["output_table_name"]) df_cluster_metrics = self._execute_sql_pipeline() - df_cluster_metrics.metadata[ - "threshold_match_probability" - ] = df_node_metrics.metadata["threshold_match_probability"] + df_cluster_metrics.metadata["threshold_match_probability"] = ( + df_node_metrics.metadata["threshold_match_probability"] + ) return df_cluster_metrics def compute_graph_metrics( From d5d361843758823526dfe4dedceb49b49bc5417b Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Sat, 16 Mar 2024 14:57:14 +0000 Subject: [PATCH 7/7] lint with black --- splink/linker.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/splink/linker.py b/splink/linker.py index 49040f87c2..ebe71b017d 100644 --- a/splink/linker.py +++ b/splink/linker.py @@ -1738,9 +1738,9 @@ def _compute_metrics_nodes( df_node_metrics = self._execute_sql_pipeline() - df_node_metrics.metadata["threshold_match_probability"] = ( - threshold_match_probability - ) + df_node_metrics.metadata[ + "threshold_match_probability" + ] = threshold_match_probability return df_node_metrics def _compute_metrics_edges( @@ -1775,9 +1775,9 @@ def _compute_metrics_edges( df_edge_metrics = compute_edge_metrics( self, df_node_metrics, df_predict, df_clustered, threshold_match_probability ) - df_edge_metrics.metadata["threshold_match_probability"] = ( - threshold_match_probability - ) + df_edge_metrics.metadata[ + "threshold_match_probability" + ] = threshold_match_probability return df_edge_metrics def _compute_metrics_clusters( @@ -1817,9 +1817,9 @@ def _compute_metrics_clusters( self._enqueue_sql(sql["sql"], sql["output_table_name"]) df_cluster_metrics = self._execute_sql_pipeline() - df_cluster_metrics.metadata["threshold_match_probability"] = ( - df_node_metrics.metadata["threshold_match_probability"] - ) + df_cluster_metrics.metadata[ + "threshold_match_probability" + ] = df_node_metrics.metadata["threshold_match_probability"] return df_cluster_metrics def compute_graph_metrics(