Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Methods in base.py to pull to and from SQL tables, unit tests in test_base.py and successful code formatting and quality checks #1938

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added analytics/mock.db
Binary file not shown.
203 changes: 193 additions & 10 deletions analytics/poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions analytics/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pydantic = "^2.0.3"
python = "^3.11"
slack-sdk = "^3.23.0"
typer = { extras = ["all"], version = "^0.9.0" }
sqlalchemy = "^2.0.30"

[tool.poetry.group.dev.dependencies]
black = "^23.7.0"
Expand Down
73 changes: 73 additions & 0 deletions analytics/src/analytics/datasets/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Self

import pandas as pd
from sqlalchemy import Engine


class BaseDataset:
Expand All @@ -22,6 +23,78 @@ def from_dict(cls, data: list[dict]) -> Self:
"""Load the dataset from a list of python dictionaries representing records."""
return cls(df=pd.DataFrame(data))

def to_sql(
self,
output_table: str,
engine: Engine,
*,
replace_table: bool = True,
) -> None:
"""Writes the contents of a pandas DataFrame to a SQL table.

This function takes a pandas DataFrame (`self.df`), an output table name (`output_table`),
and a SQLAlchemy Engine object (`engine`) as required arguments. It optionally accepts
a `replace_table` argument (default: True) that determines how existing data in the
target table is handled.

**Parameters:**

* self (required): The instance of the class containing the DataFrame (`self.df`)
to be written to the database.
* output_table (str, required): The name of the table in the database where the
data will be inserted.
* engine (sqlalchemy.engine.Engine, required): A SQLAlchemy Engine object representing
the connection to the database.
* replace_table (bool, default=True):
* If True (default), the function will completely replace the contents of the
existing table with the data from the DataFrame. (if_exists="replace")
* If False, the data from the DataFrame will be appended to the existing table.
(if_exists="append")

**Returns:**

* None

**Raises:**

* Potential exceptions raised by the underlying pandas.to_sql function, such as
database connection errors or errors related to data type mismatches.
"""
if replace_table:
self.df.to_sql(output_table, engine, if_exists="replace", index=False)
else:
self.df.to_sql(output_table, engine, if_exists="append", index=False)

@classmethod
def from_sql(
cls,
source_table: str,
engine: Engine,
) -> Self:
"""Reads data from a SQL table into a pandas DataFrame and creates an instance of the current class.
This function takes a source table name (`source_table`) and a SQLAlchemy Engine object (`engine`) as required arguments. It utilizes pandas.read_sql to retrieve the data from the database and then creates a new instance of the current class (`cls`) initialized with the resulting DataFrame (`df`).

**Parameters:**

* cls (class, required): The class that will be instantiated with the data from the
SQL table. This allows for creating objects of the same type as the function is called on.
* source_table (str, required): The name of the table in the database from which the
data will be read.
* engine (sqlalchemy.engine.Engine, required): A SQLAlchemy Engine object representing
the connection to the database.

**Returns:**

* Self: A new instance of the current class (`cls`) initialized with the DataFrame
containing the data from the SQL table.

**Raises:**

* Potential exceptions raised by the underlying pandas.read_sql function, such as
database connection errors or errors related to data type mismatches.
"""
return cls(df=pd.read_sql(source_table, engine))

def to_csv(
self,
output_file: Path,
Expand Down
21 changes: 21 additions & 0 deletions analytics/src/analytics/integrations/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# pylint: disable=invalid-name
"""Get a connection to the database using a SQLAlchemy engine object."""

from sqlalchemy import Engine, create_engine

from config import settings


def get_db() -> Engine:
"""
Get a connection to the database using a SQLAlchemy engine object.

This function retrieves the database connection URL from the configuration
and creates a SQLAlchemy engine object.

Yields
------
sqlalchemy.engine.Engine
A SQLAlchemy engine object representing the connection to the database.
"""
return create_engine(settings.database_url, pool_pre_ping=True)
25 changes: 25 additions & 0 deletions analytics/tests/datasets/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,38 @@

from analytics.datasets.base import BaseDataset

from analytics.integrations.db import get_db

from config import settings

TEST_DATA = [
{"Col A": 1, "Col b": "One"},
{"Col A": 2, "Col b": "Two"},
{"Col A": 3, "Col b": "Three"},
]


# add a test for sql
def test_to_and_from_sql():
"""BaseDataset should write to and load from SQL table with to_sql() and from_sql()."""
# Setup - create sample dataframe and instantiate class
test_df = pd.DataFrame(TEST_DATA)
dataset_in = BaseDataset(test_df)

# Setup - configure SQL connection
db_url = settings.database_url
# Assert the value of db_url
assert db_url == "sqlite:///mock.db"
engine = get_db()
table_name = "your_table_name" # Replace with actual table name

# Execution - write to SQL table and read from SQL table
dataset_in.to_sql(table_name, engine)
dataset_out = BaseDataset.from_sql(table_name, engine)
# Validation - check that datasets match
assert dataset_in.df.equals(dataset_out.df)


def test_to_and_from_csv(tmp_path: Path):
"""BaseDataset should write to csv with to_csv() and load from a csv with from_csv()."""
# setup - create sample dataframe and instantiate class
Expand Down
Loading