Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support writing nested frames to parquet files #83

Merged
merged 6 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/nested_pandas/datasets/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,32 @@
return base_nf
else:
raise TypeError("Input to n_layer is not an int or dict.")


def generate_parquet_file(n_base, n_layer, path, file_per_layer=False, seed=None):
"""Generates a toy dataset and outputs it to one or more parquet files.

Parameters
----------
n_base : int
The number of rows to generate for the base layer
n_layer : int, or dict
The number of rows per n_base row to generate for a nested layer.
Alternatively, a dictionary of layer label, layer_size pairs may be
specified to created multiple nested columns with custom sizing.
path : str,
The path to the parquet file to write to if `file_per_layer` is `False`,
and otherwise the path to the directory to write the parquet file for
each layer.
file_per_layer : bool, default=False
If True, write each layer to its own parquet file. Otherwise, write
the generated to a single parquet file representing a nested dataset.
seed : int, default=None
A seed to use for random generation of data

Returns
-------
None
"""
nf = generate_data(n_base, n_layer, seed)
nf.to_parquet(path, by_layer=file_per_layer)

Check warning on line 84 in src/nested_pandas/datasets/generation.py

View check run for this annotation

Codecov / codecov/patch

src/nested_pandas/datasets/generation.py#L83-L84

Added lines #L83 - L84 were not covered by tests
49 changes: 49 additions & 0 deletions src/nested_pandas/nestedframe/core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# typing.Self and "|" union syntax don't exist in Python 3.9
from __future__ import annotations

import os

import numpy as np
import pandas as pd
import pyarrow as pa
Expand Down Expand Up @@ -438,3 +440,50 @@ def translate_cols(frame, layer, col):
axis=1, # to apply func on each row of our nested frame)
)
return result

def to_parquet(self, path, by_layer=False, **kwargs) -> None:
"""Creates parquet file(s) with the data of a NestedFrame, either
as a single parquet file where each nested dataset is packed into its
own column or as an individual parquet file for each layer.

Note that here we always opt to use the pyarrow engine for writing
parquet files.

Parameters
----------
path : str
The path to the parquet file to be written if 'by_layer' is False.
If 'by_layer' is True, this should be the path to an existing.
by_layer : bool, default False
If False, writes the entire NestedFrame to a single parquet file.

If True, writes each layer to a separate parquet file within the
directory specified by path. The filename for each outputted file will
be named after its layer and then the ".parquet" extension.
For example for the base layer this is always "base.parquet".
wilsonbb marked this conversation as resolved.
Show resolved Hide resolved
kwargs : keyword arguments, optional
Keyword arguments to pass to the function.

Returns
-------
None
"""
if not by_layer:
# We just defer to the pandas to_parquet method if we're not writing by layer
# or there is only one layer in the NestedFrame.
super().to_parquet(path, engine="pyarrow", **kwargs)
else:
# If we're writing by layer, path must be an existing directory
if not os.path.isdir(path):
raise ValueError("The provided path must be an existing directory if by_layer=True")

# Write the base layer to a parquet file
base_frame = self.drop(columns=self.nested_columns, inplace=False)
base_frame.to_parquet(os.path.join(path, "base.parquet"), by_layer=False, **kwargs)

# Write each nested layer to a parquet file
for layer in self.all_columns:
if layer != "base":
path_layer = os.path.join(path, f"{layer}.parquet")
self[layer].nest.to_flat().to_parquet(path_layer, engine="pyarrow", **kwargs)
return None
10 changes: 6 additions & 4 deletions src/nested_pandas/nestedframe/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

def read_parquet(
data: FilePath | ReadBuffer[bytes],
to_pack: dict,
to_pack: dict | None = None,
columns: list[str] | None = None,
pack_columns: dict | None = None,
dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
Expand All @@ -40,10 +40,11 @@ def read_parquet(
partitioned parquet files. Both pyarrow and fastparquet support
paths to directories as well as file URLs. A directory path could be:
``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``.
to_pack: dict,
to_pack: dict, default=None
A dictionary of parquet data paths (same criteria as `data`), where
each key reflects the desired column name to pack the data into and
each value reflects the parquet data to pack.
each value reflects the parquet data to pack. If None, it assumes
that any data to pack is already packed as a column within `data`.
columns : list, default=None
If not None, only these columns will be read from the file.
pack_columns: dict, default=None
Expand All @@ -64,7 +65,8 @@ def read_parquet(
"""

df = NestedFrame(pd.read_parquet(data, engine="pyarrow", columns=columns, dtype_backend=dtype_backend))

if to_pack is None:
return df
for pack_key in to_pack:
col_subset = pack_columns.get(pack_key, None) if pack_columns is not None else None
packed = pd.read_parquet(
Expand Down
78 changes: 77 additions & 1 deletion tests/nested_pandas/nestedframe/test_io.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os
import tempfile

import pandas as pd
import pytest
from nested_pandas import read_parquet
from nested_pandas import NestedFrame, read_parquet
from pandas.testing import assert_frame_equal


@pytest.mark.parametrize("columns", [["a"], None])
Expand Down Expand Up @@ -57,3 +59,77 @@ def test_read_parquet(tmp_path, columns, pack_columns):
assert nf[nested_col].nest.fields == nested1.columns.tolist()
elif nested_col == "nested2":
assert nf[nested_col].nest.fields == nested2.columns.tolist()


def test_write_packed_parquet():
"""Tests writing a nested frame to a single parquet file."""
# Generate some test data
base = pd.DataFrame(data={"a": [1, 2, 3], "b": [2, 4, 6]}, index=[0, 1, 2])

nested1 = pd.DataFrame(
data={"c": [0, 2, 4, 1, 4, 3, 1, 4, 1], "d": [5, 4, 7, 5, 3, 1, 9, 3, 4]},
index=[0, 0, 0, 1, 1, 1, 2, 2, 2],
)

nested2 = pd.DataFrame(
data={"e": [0, 2, 4, 1, 4, 3, 1, 4, 1], "f": [5, 4, 7, 5, 3, 1, 9, 3, 4]},
index=[0, 0, 0, 1, 1, 1, 2, 2, 2],
)

# Construct the NestedFrame
nf = NestedFrame(base).add_nested(nested1, name="nested1").add_nested(nested2, name="nested2")

# Write to parquet using a named temporary file
temp = tempfile.NamedTemporaryFile(suffix=".parquet")
nf.to_parquet(temp.name)

# Read from parquet
nf2 = read_parquet(temp.name)
assert_frame_equal(nf, nf2)


def test_write_parquet_by_layer():
"""Tests writing a nested frame to multiple parquet files."""
base = pd.DataFrame(data={"a": [1, 2, 3], "b": [2, 4, 6]}, index=[0, 1, 2])

nested1 = pd.DataFrame(
data={"c": [0, 2, 4, 1, 4, 3, 1, 4, 1], "d": [5, 4, 7, 5, 3, 1, 9, 3, 4]},
index=[0, 0, 0, 1, 1, 1, 2, 2, 2],
)

nested2 = pd.DataFrame(
data={"e": [0, 2, 4, 1, 4, 3, 1, 4, 1], "f": [5, 4, 7, 5, 3, 1, 9, 3, 4]},
index=[0, 0, 0, 1, 1, 1, 2, 2, 2],
)

# Construct the NestedFrame
nf = NestedFrame(base).add_nested(nested1, name="nested1").add_nested(nested2, name="nested2")

# Asser that a temporary file path must be a directory when by_layer is True
with pytest.raises(ValueError):
nf.to_parquet(tempfile.NamedTemporaryFile(suffix=".parquet").name, by_layer=True)

# Write to parquet using a named temporary file
tmp_dir = tempfile.TemporaryDirectory()
nf.to_parquet(tmp_dir.name, by_layer=True)

# Validate the individual layers were correctly saved as their own parquet files
read_base_frame = read_parquet(os.path.join(tmp_dir.name, "base.parquet"), to_pack=None)
assert_frame_equal(read_base_frame, nf.drop(columns=["nested1", "nested2"]))

read_nested1 = read_parquet(os.path.join(tmp_dir.name, "nested1.parquet"), to_pack=None)
assert_frame_equal(read_nested1, nf["nested1"].nest.to_flat())

read_nested2 = read_parquet(os.path.join(tmp_dir.name, "nested2.parquet"), to_pack=None)
assert_frame_equal(read_nested2, nf["nested2"].nest.to_flat())

# Validate the entire NestedFrame can be read
entire_nf = read_parquet(
data=os.path.join(tmp_dir.name, "base.parquet"),
to_pack={
"nested1": os.path.join(tmp_dir.name, "nested1.parquet"),
"nested2": os.path.join(tmp_dir.name, "nested2.parquet"),
},
)

assert_frame_equal(nf, entire_nf)