Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add pandas utility for dataframe #84

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/api_reference/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ nisystemlink.clients.dataframe
.. automodule:: nisystemlink.clients.dataframe.models
:members:
:imported-members:

.. automodule:: nisystemlink.clients.dataframe.utilities
:members:
:imported-members:
19 changes: 19 additions & 0 deletions docs/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,19 @@ With a :class:`.DataFrameClient` object, you can:

* Export table data in a comma-separated values (CSV) format.

Pandas Utility
~~~~~~~~~~~~~~

Utility functions for managing Pandas DataFrames and interacting with the DataFrame API include:
ancy-augustin marked this conversation as resolved.
Show resolved Hide resolved

* Create a table from a pandas dataframe.

* Append pandas dataframe to an existing table.

* Query decimated data from a table as pandas dataframe.

* Query data from a table as pandas dataframe.
ancy-augustin marked this conversation as resolved.
Show resolved Hide resolved

Examples
~~~~~~~~

Expand All @@ -121,6 +134,12 @@ Export data from a table
:language: python
:linenos:

Table operations using pandas dataframe
ancy-augustin marked this conversation as resolved.
Show resolved Hide resolved

.. literalinclude:: ../examples/dataframe/pandas_dataframe_operations.py
:language: python
:linenos:

Spec API
-------

Expand Down
58 changes: 58 additions & 0 deletions examples/dataframe/pandas_dataframe_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import pandas as pd
from nisystemlink.clients.dataframe import DataFrameClient
from nisystemlink.clients.core import HttpConfiguration
from nisystemlink.clients.dataframe.models import (
DecimationMethod,
DecimationOptions,
QueryDecimatedDataRequest,
QueryTableDataRequest,
)
from nisystemlink.clients.dataframe.utilities import (
append_pandas_df_to_table,
create_table_from_pandas_df,
InvalidColumnTypeError,
InvalidIndexError,
query_decimated_table_data_as_pandas_df,
query_table_data_as_pandas_df,
)

client = DataFrameClient()
df: pd.DataFrame = pd.DataFrame(
data=[[1, 2, 3], [4, 5, 6], [7, 8, 9]], columns=["a", "b", "c"]
)
df.set_index("a", inplace=True)

try:
table_id = create_table_from_pandas_df(
client, df, "Example Table", nullable_columns=False
)
print(f"Table created with ID: {table_id}")
except (InvalidColumnTypeError, InvalidIndexError) as e:
print(f"Error creating table: {e}")

append_pandas_df_to_table(client, table_id, df)
print("Data appended to the table.")

request = QueryDecimatedDataRequest(
decimation=DecimationOptions(
x_column="a",
y_columns=["b"],
intervals=1,
method=DecimationMethod.MaxMin,
)
)

queried_decimated_df = query_decimated_table_data_as_pandas_df(
client, table_id, query=request, index=True
)
print("Queried decimated data as pandas dataframe:")
print(queried_decimated_df.columns)

query = QueryTableDataRequest()
queried_df = query_table_data_as_pandas_df(
client=client, table_id=table_id, query=query, index=True
)
print("Queried table data as pandas dataframe:")
print(queried_df)

client.delete_table(table_id)
28 changes: 27 additions & 1 deletion nisystemlink/clients/dataframe/models/_data_frame.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List, Optional

import pandas as pd
from nisystemlink.clients.core._uplink._json_model import JsonModel


Expand Down Expand Up @@ -52,6 +53,31 @@ class DataFrame(JsonModel):
columns: Optional[List[str]] = None
"""The names and order of the columns included in the data frame."""

data: List[List[Optional[str]]]
data: List[List[Optional[str]]] = None
"""The data for each row with the order specified in the columns property.
Must contain a value for each column in the columns property."""

def from_pandas(self, df: pd.DataFrame) -> None:
"""Convert pandas dataframe to `DataFrame`.

Args:
df (pd.DataFrame): Pandas dataframe.
"""
self.columns = [df.index.name] + df.columns.astype(str).tolist()
self.data = [
[str(index)] + row.astype(str).tolist() for index, row in df.iterrows()
]

def to_pandas(self, index: Optional[str] = None) -> pd.DataFrame:
"""Convert `DataFrame` to pandas dataframe.

Args:
index (Optional[str]): Column to set as index.

Returns:
pd.DataFrame: Converted pandas dataframe.
"""
df = pd.DataFrame(data=self.data, columns=self.columns)
if index:
df.set_index(index, inplace=True)
return df
9 changes: 9 additions & 0 deletions nisystemlink/clients/dataframe/utilities/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from ._pandas_exception import InvalidColumnTypeError, InvalidIndexError
from ._pandas_dataframe_operations import (
create_table_from_pandas_df,
append_pandas_df_to_table,
query_decimated_table_data_as_pandas_df,
query_table_data_as_pandas_df,
)

# flake8: noqa
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import pandas as pd
from nisystemlink.clients.dataframe import DataFrameClient
from nisystemlink.clients.dataframe.models import (
AppendTableDataRequest,
CreateTableRequest,
DataFrame,
QueryDecimatedDataRequest,
QueryTableDataRequest,
)

from ._pandas_utils import (
_get_table_index_name,
_infer_dataframe_columns,
_infer_index_column,
)


def create_table_from_pandas_df(
client: DataFrameClient, df: pd.DataFrame, table_name: str, nullable_columns: bool
) -> str:
"""Create a table from a pandas DataFrame.

Args:
client (DataFrameClient): Instance of DataFrameClient.
df (pd.DataFrame): Pandas dataframe.
table_name (str): Name of the table.
nullable_columns (bool): Make the columns nullable.
ancy-augustin marked this conversation as resolved.
Show resolved Hide resolved

Returns:
str: ID of the table.
"""
index = _infer_index_column(df)
table_columns = [index]

dataframe_columns = _infer_dataframe_columns(df, nullable_columns)
table_columns += dataframe_columns

table_id = client.create_table(
CreateTableRequest(name=table_name, columns=table_columns)
)
return table_id


def append_pandas_df_to_table(
ancy-augustin marked this conversation as resolved.
Show resolved Hide resolved
client: DataFrameClient, table_id: str, df: pd.DataFrame
) -> None:
"""Append `df` to table.

Args:
client: Instance of `DataFrameClient`.
table_id: ID of the table.
df: Pandas DataFrame containing the data to append.

Returns:
None
"""
frame = DataFrame()
frame.from_pandas(df)
client.append_table_data(
ancy-augustin marked this conversation as resolved.
Show resolved Hide resolved
id=table_id, data=AppendTableDataRequest(frame=frame, end_of_data=False)
)


def query_decimated_table_data_as_pandas_df(
client: DataFrameClient,
table_id: str,
query: QueryDecimatedDataRequest,
index: bool,
) -> pd.DataFrame:
"""Query data from the table.

Args:
client (DataFrameClient): Instance of DataFrameClient.
table_id (str): ID of the table.
query (QueryDecimatedDataRequest): Request to query decimated data.
index (bool, optional): Whether index column to be included.

Returns:
pd.DataFrame: Table data in pandas dataframe format.
"""
index_name: str = None
if index:
index_name = _get_table_index_name(client=client, table_id=table_id)
if query.columns:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we combine these two if statements into a single if.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot combine the if statements because the index_name is required regardless of whether query.columns is None, and checking index_name in query.columns should only occur when both are valid.

if index_name not in query.columns:
query.columns.append(index_name)
response = client.query_decimated_data(table_id, query)
return response.frame.to_pandas(index_name)


def query_table_data_as_pandas_df(
client: DataFrameClient,
table_id: str,
query: QueryTableDataRequest,
index: bool = False,
) -> pd.DataFrame:
"""Query data from the table.

Args:
client (DataFrameClient): Instance of `DataFrameClient`.
table_id (str): ID of the table.
query (QueryTableDataRequest): Request to query data.
index (bool, optional): Whether index column to be included.

Returns:
pd.DataFrame: Table data in pandas dataframe format.
"""
continuation_token = None
all_rows = []
index_name: str = None

if index:
index_name = _get_table_index_name(client=client, table_id=table_id)
if query.columns:
if index_name not in query.columns:
query.columns.append(index_name)

while True:
response = client.query_table_data(table_id, query)
all_rows.append(response.frame.to_pandas(index_name))
continuation_token = response.continuation_token

if continuation_token:
query.continuation_token = continuation_token
else:
break

return pd.concat(all_rows, ignore_index=not (index))
27 changes: 27 additions & 0 deletions nisystemlink/clients/dataframe/utilities/_pandas_exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
class DataFrameError(Exception):
"""Base class for Dataframe errors."""

pass


class InvalidIndexError(DataFrameError):
"""Raised when an invalid or missing index column is encountered."""

def __init__(self, index_name: str = None) -> None:
self.index_name = index_name
self.message = "Data frame must contain one index."
if index_name:
self.message = (
f"Column '{self.index_name}' must be of type INT32, INT64, or TIMESTAMP to be an index column."
ancy-augustin marked this conversation as resolved.
Show resolved Hide resolved
)
super().__init__(self.message)


class InvalidColumnTypeError(DataFrameError):
"""Raised when a column has an unsupported data type."""

def __init__(self, column_name: str, column_type: str) -> None:
self.column_name = column_name
self.column_type = column_type
self.message = f"Column '{column_name}' has an unsupported datatype: {column_type}"
super().__init__(self.message)
Loading
Loading