Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Splink4] Use fresh SQLPipeline for all linker methods #2060

Closed
RobinL opened this issue Mar 14, 2024 · 4 comments
Closed

[Splink4] Use fresh SQLPipeline for all linker methods #2060

RobinL opened this issue Mar 14, 2024 · 4 comments

Comments

@RobinL
Copy link
Member

RobinL commented Mar 14, 2024

Rather than a shared mutable self._pipeline, instead use a fresh pipeline = SQLPipeline() for all methods such as predict()

One challenge with this at the moment is materialisation.

At the moment, we have methods like _initialise_df_concat_with_tf, which is difficult to understand due to how it mutates things and returns things:

  • It mutates the self._pipelien by adding sql to it
  • If the user has asked to materialise, it returns a SplinkDataframe. The pipeline will then be flushed because it's been executed
  • If not it will have added to the pipeline, and returns None

On reason for this behaviour is _initialise_df_concat_with_tf is created onced and used many times, so often we want to compute it so it can be cached.

Can this be handled somehow by the pipeline itself?

Specifically, can we extend the concept of the pipeline to allow us to tell it that some parts of the pipeline need to be materialised?

How does this interact with caching? For any pipeline, can we magically check whether any part of it is already cached?

@RobinL
Copy link
Member Author

RobinL commented Mar 14, 2024

Probably need to start with a minisplink example using dbapi

@RobinL
Copy link
Member Author

RobinL commented Mar 14, 2024

Example code:

from splink import DuckDBAPI, splink_datasets
from splink.pipeline import SQLPipeline

dbapi = DuckDBAPI()


df = splink_datasets.fake_1000

dbapi.register_table(df, "df")


pipeline = SQLPipeline()
sql = """
select first_name, surname
from df
"""

pipeline.enqueue_sql(sql, "df2")

sql = """
select first_name, count(*) as c
from df2
group by first_name
"""

pipeline.enqueue_sql(sql, "final")

dbapi.sql_pipeline_to_splink_dataframe(pipeline).as_pandas_dataframe()

@RobinL
Copy link
Member Author

RobinL commented Mar 14, 2024

I've tried a solution that modifies sql tasks to allow them to be materialised

class SQLTask:
    def __init__(self, sql, output_table_name, materialise=False):
        self.sql = sql
        self.output_table_name = output_table_name
        self.materialise = materialise

And then extends SQLPipeline to allow it to have sub-pipelines (be subdivided into CTE sequences) which are executed. it's possibly viable, but a bit problem is that the pipeline class then needs to somehow be able to predict the name of the output. Maybe that could be enabled by allowing the a task to know about its output_table_name but also a physical_output_table_name

class SQLTask: def __init__(self, sql, output_table_name, materialise=False): self.sql = sql self.output_table_name = output_table_name self.materialise = materialise
@property
def _uses_tables(self):
    try:
        tree = sqlglot.parse_one(self.sql, read=None)
    except ParseError:
        return ["Failure to parse SQL - tablenames not known"]

    table_names = set()
    for subtree, _parent, _key in tree.walk():
        if type(subtree) is Table:
            table_names.add(subtree.sql())
    return list(table_names)

@property
def task_description(self):
    uses_tables = ", ".join(self._uses_tables)
    uses_tables = f" {uses_tables} "

    return (
        f"Task reads tables [{uses_tables}]"
        f" and has output table name: {self.output_table_name}"
    )

class CTEPipeline:
def init(self):
self.queue = []

def enqueue_sql(self, sql, output_table_name):
    sql_task = SQLTask(sql, output_table_name)
    self.queue.append(sql_task)

def generate_pipeline_parts(self, input_dataframes):
    parts = deepcopy(self.queue)
    for df in input_dataframes:
        if not df.physical_and_template_names_equal:
            sql = f"select * from {df.physical_name}"
            task = SQLTask(sql, df.templated_name)
            parts.insert(0, task)
    return parts

def _log_pipeline(self, parts, input_dataframes):
    if logger.isEnabledFor(7):
        inputs = ", ".join(df.physical_name for df in input_dataframes)
        logger.log(
            7,
            f"SQL pipeline was passed inputs [{inputs}] and output "
            f"dataset {parts[-1].output_table_name}",
        )

        for i, part in enumerate(parts):
            logger.log(7, f"    Pipeline part {i+1}: {part.task_description}")

# Modify this to return a list
def generate_pipeline(self, input_dataframes):
    parts = self.generate_pipeline_parts(input_dataframes)

    self._log_pipeline(parts, input_dataframes)

    with_parts = parts[:-1]
    last_part = parts[-1]

    with_parts = [f"{p.output_table_name} as ({p.sql})" for p in with_parts]
    with_parts = ", \n".join(with_parts)
    if with_parts:
        with_parts = f"WITH {with_parts} "

    final_sql = with_parts + last_part.sql

    return final_sql

@property
def output_table_name(self):
    return self.queue[-1].output_table_name

class SQLPipeline:

def __init__(self):
    self.queue = []

def enqueue_sql(self, sql, output_table_name, materialise=False):
    sql_task = SQLTask(sql, output_table_name, materialise)
    self.queue.append(sql_task)

def generate_sql(input_dataframes):
    cte_sql_statements = []

    current_pipeline = CTEPipeline()
    for task in self.queue:

        current_pipeline.enqueue_sql(task.sql, task.output_table_name)
        # Problem here is that input dataframes 
        if task.materialise:
            cte_sql_statements.append(
                current_pipeline.generate_pipeline(input_dataframes)
            )
            current_pipeline = CTEPipeline()

But you still have a problem that if you have materialisations, it's not clear what input tables need to carry forward to the next step - i.e. it assumes a single linear dag whereas in general a CTE sequence could be materialised half way, but still need the input tables further down.

It's still solvable by carrying everything forwards, but i think at that point, we're building a generic dag implementation which is probably a bit too much.

However, iirc materialisation is very rarely required mid pipeline, perhaps only in predict(), so instead, I'm thinking about instead just doing a simple option of:

sql generation methods always do something like:
self._enqueue_df_concat_with_tf(pipeline)

And then in the special case that materialisation is needed optionally, we just do

if materialise:
   # pipeline.execute

or whatever.

But trying to stick with a general pattern of the pipeline simply being used to enqueue stuff and then is executed. Once executed it should be discarded - in the lnog run, we want to prevent enqueuing once executed

WIll be interesting to observe the extent to which predict() is now decoupled from linker as i go through this

@RobinL
Copy link
Member Author

RobinL commented Apr 17, 2024

Was closed by #2062

@RobinL RobinL closed this as completed Apr 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant