-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add pandas utility for dataframe #84
base: master
Are you sure you want to change the base?
Changes from all commits
425643a
24ed4a6
8606b50
e46b26f
bb05d3c
e8cde8d
2529a9b
29b7630
8ca6389
303de9f
204ea7d
2706f58
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import pandas as pd | ||
from nisystemlink.clients.dataframe import DataFrameClient | ||
from nisystemlink.clients.dataframe.models import ( | ||
ColumnOrderBy, | ||
DecimationMethod, | ||
DecimationOptions, | ||
QueryDecimatedDataRequest, | ||
QueryTableDataRequest, | ||
) | ||
from nisystemlink.clients.dataframe.utilities import ( | ||
append_pandas_df_to_table, | ||
create_table_from_pandas_df, | ||
create_table_with_data_from_pandas_df, | ||
InvalidColumnTypeError, | ||
InvalidIndexError, | ||
query_decimated_table_data_as_pandas_df, | ||
query_table_data_as_pandas_df, | ||
) | ||
|
||
client = DataFrameClient() | ||
df: pd.DataFrame = pd.DataFrame( | ||
data=[[1, 2, 3], [4, 5, 6], [7, 8, 9]], columns=["a", "b", "c"] | ||
) | ||
df.set_index("a", inplace=True) | ||
print(df) | ||
|
||
try: | ||
table_id = create_table_from_pandas_df( | ||
client, df, "Example Table", nullable_columns=False | ||
) | ||
print(f"\nTable created with ID: {table_id}") | ||
except (InvalidColumnTypeError, InvalidIndexError) as e: | ||
print(f"Error creating table: {e}") | ||
|
||
append_pandas_df_to_table(client, table_id, df) | ||
print("\nData appended to the table.") | ||
|
||
request = QueryDecimatedDataRequest( | ||
decimation=DecimationOptions( | ||
x_column="a", | ||
y_columns=["b"], | ||
intervals=1, | ||
method=DecimationMethod.MaxMin, | ||
) | ||
) | ||
|
||
queried_decimated_df = query_decimated_table_data_as_pandas_df( | ||
client, table_id, query=request, index=True | ||
) | ||
print("\nQueried decimated data as pandas dataframe:") | ||
print(queried_decimated_df.columns) | ||
query = QueryTableDataRequest( | ||
columns=["b", "c"], order_by=[ColumnOrderBy(column="b", descending=True)] | ||
) | ||
queried_df = query_table_data_as_pandas_df( | ||
client=client, table_id=table_id, query=query, index=True | ||
) | ||
print("\nQueried table data as pandas dataframe:") | ||
print(queried_df) | ||
|
||
client.delete_table(table_id) | ||
print(f"\nTable {table_id} deleted successfully.") | ||
|
||
try: | ||
table_id = create_table_with_data_from_pandas_df(client, df, "Example Table") | ||
print(f"\nTable created with ID: {table_id}") | ||
except (InvalidColumnTypeError, InvalidIndexError) as e: | ||
print(f"Error creating table: {e}") | ||
|
||
query = QueryTableDataRequest() | ||
table_data = query_table_data_as_pandas_df( | ||
client=client, table_id=table_id, query=query, index=True | ||
) | ||
|
||
print("\nTable data as pandas DataFrame:") | ||
print(table_data) | ||
|
||
client.delete_table(table_id) | ||
print(f"\nTable {table_id} deleted successfully.\n") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from ._pandas_exception import InvalidColumnTypeError, InvalidIndexError | ||
from ._pandas_dataframe_operations import ( | ||
create_table_from_pandas_df, | ||
create_table_with_data_from_pandas_df, | ||
append_pandas_df_to_table, | ||
query_decimated_table_data_as_pandas_df, | ||
query_table_data_as_pandas_df, | ||
) | ||
|
||
# flake8: noqa |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
from typing import Optional | ||
|
||
import pandas as pd | ||
from nisystemlink.clients.dataframe import DataFrameClient | ||
from nisystemlink.clients.dataframe.models import ( | ||
AppendTableDataRequest, | ||
CreateTableRequest, | ||
DataFrame, | ||
QueryDecimatedDataRequest, | ||
QueryTableDataRequest, | ||
) | ||
|
||
from ._pandas_utils import ( | ||
_get_table_index_name, | ||
_infer_dataframe_columns, | ||
_infer_index_column, | ||
) | ||
|
||
|
||
def create_table_from_pandas_df( | ||
client: DataFrameClient, df: pd.DataFrame, table_name: str, nullable_columns: bool | ||
) -> str: | ||
"""Create a table from a pandas DataFrame. | ||
|
||
Args: | ||
client (DataFrameClient): Instance of DataFrameClient. | ||
df (pd.DataFrame): Pandas dataframe. | ||
table_name (str): Name of the table. | ||
nullable_columns (bool): Make the columns nullable. Nullable columns can contain `null` values. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have a default value to @santhoshramaraj What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to understand the impact of |
||
|
||
Returns: | ||
str: ID of the table. | ||
""" | ||
index = _infer_index_column(df) | ||
table_columns = [index] | ||
|
||
dataframe_columns = _infer_dataframe_columns(df, nullable_columns) | ||
table_columns += dataframe_columns | ||
|
||
table_id = client.create_table( | ||
CreateTableRequest(name=table_name, columns=table_columns) | ||
) | ||
return table_id | ||
|
||
|
||
def append_pandas_df_to_table( | ||
ancy-augustin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
client: DataFrameClient, | ||
table_id: str, | ||
df: pd.DataFrame, | ||
end_of_data: Optional[bool] = False, | ||
) -> None: | ||
"""Append `df` to table. | ||
|
||
Args: | ||
client: Instance of `DataFrameClient`. | ||
table_id: ID of the table. | ||
df: Pandas DataFrame containing the data to append. | ||
|
||
Returns: | ||
None | ||
""" | ||
frame: DataFrame = DataFrame() | ||
frame.from_pandas(df) | ||
client.append_table_data( | ||
ancy-augustin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
id=table_id, data=AppendTableDataRequest(frame=frame, end_of_data=end_of_data) | ||
) | ||
|
||
|
||
def create_table_with_data_from_pandas_df( | ||
client: DataFrameClient, | ||
df: pd.DataFrame, | ||
table_name: str, | ||
batch_size: int = 1000, | ||
) -> str: | ||
"""Create a table and upload data from a pandas DataFrame with batching. | ||
|
||
Args: | ||
client (DataFrameClient): Instance of DataFrameClient. | ||
df (pd.DataFrame): Pandas DataFrame with data to upload. | ||
table_name (str): Name of the table to create. | ||
batch_size (Optional[int]): Number of rows to batch in each upload. Default is 1000. | ||
|
||
Returns: | ||
str: ID of the created table. | ||
""" | ||
table_id = create_table_from_pandas_df( | ||
client=client, df=df, table_name=table_name, nullable_columns=False | ||
) | ||
|
||
num_rows = df.shape[0] | ||
for start_row in range(0, num_rows, batch_size): | ||
end_row = min(start_row + batch_size, num_rows) | ||
batch_df = df.iloc[start_row:end_row] | ||
append_pandas_df_to_table( | ||
client, table_id, batch_df, end_of_data=(end_row == num_rows) | ||
) | ||
|
||
return table_id | ||
|
||
|
||
def query_decimated_table_data_as_pandas_df( | ||
client: DataFrameClient, | ||
table_id: str, | ||
query: QueryDecimatedDataRequest, | ||
index: bool, | ||
) -> pd.DataFrame: | ||
"""Query data from the table. | ||
|
||
Args: | ||
client (DataFrameClient): Instance of DataFrameClient. | ||
table_id (str): ID of the table. | ||
query (QueryDecimatedDataRequest): Request to query decimated data. | ||
index (bool, optional): Whether index column to be included. | ||
|
||
Returns: | ||
pd.DataFrame: Table data in pandas dataframe format. | ||
""" | ||
index_name = None | ||
if index: | ||
index_name = _get_table_index_name(client=client, table_id=table_id) | ||
if query.columns and index_name: | ||
if index_name not in query.columns: | ||
query.columns.append(index_name) | ||
response = client.query_decimated_data(table_id, query) | ||
return response.frame.to_pandas(index_name) | ||
|
||
|
||
def query_table_data_as_pandas_df( | ||
client: DataFrameClient, | ||
table_id: str, | ||
query: QueryTableDataRequest, | ||
index: bool = False, | ||
) -> pd.DataFrame: | ||
"""Query data from the table. | ||
|
||
Args: | ||
client (DataFrameClient): Instance of `DataFrameClient`. | ||
table_id (str): ID of the table. | ||
query (QueryTableDataRequest): Request to query data. | ||
index (bool, optional): Whether index column to be included. | ||
|
||
Returns: | ||
pd.DataFrame: Table data in pandas dataframe format. | ||
""" | ||
continuation_token = None | ||
all_rows = [] | ||
|
||
if index: | ||
index_name = _get_table_index_name(client=client, table_id=table_id) | ||
if query.columns and index_name: | ||
if index_name not in query.columns: | ||
query.columns.append(index_name) | ||
|
||
while True: | ||
response = client.query_table_data(table_id, query) | ||
all_rows.append(response.frame.to_pandas(index_name)) | ||
continuation_token = response.continuation_token | ||
|
||
if continuation_token: | ||
query.continuation_token = continuation_token | ||
else: | ||
break | ||
|
||
return pd.concat(all_rows, ignore_index=not (index)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
SUPPORTED_INDEX_DATA_TYPE = ["INT32", "INT64", "TIMESTAMP"] | ||
|
||
|
||
class DataFrameError(Exception): | ||
"""Base class for Dataframe errors.""" | ||
|
||
pass | ||
|
||
|
||
class InvalidIndexError(DataFrameError): | ||
"""Raised when an invalid or missing index column is encountered.""" | ||
|
||
def __init__(self, index_name: str = None) -> None: | ||
self.index_name = index_name | ||
self.message = "Data frame must contain one index." | ||
if index_name: | ||
self.message = f"Column '{self.index_name}' must be of type {SUPPORTED_INDEX_DATA_TYPE}" | ||
" to be an index column." | ||
super().__init__(self.message) | ||
|
||
|
||
class InvalidColumnTypeError(DataFrameError): | ||
"""Raised when a column has an unsupported data type.""" | ||
|
||
def __init__(self, column_name: str, column_type: str) -> None: | ||
self.column_name = column_name | ||
self.column_type = column_type | ||
self.message = ( | ||
f"Column '{column_name}' has an unsupported datatype: {column_type}" | ||
) | ||
super().__init__(self.message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could making this field optional lead to an error? This will provide a way to call the API without providing data.