Skip to content

Commit

Permalink
feat: merge_parquet (#97)
Browse files Browse the repository at this point in the history
* Made parquet merge, works but needs max_files_open?

* added to test

* Fixed for nested arrays, added test

* globbing
  • Loading branch information
zbilodea authored Jul 19, 2024
1 parent b104c2a commit acdee53
Show file tree
Hide file tree
Showing 3 changed files with 335 additions and 0 deletions.
223 changes: 223 additions & 0 deletions src/hepconvert/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,229 @@
from hepconvert.histogram_adding import _hadd_1d, _hadd_2d, _hadd_3d


def merge_parquet(
out_file,
in_files,
*,
# max_files=2,
force=False,
list_to32=False,
string_to32=True,
bytestring_to32=True,
emptyarray_to=None,
categorical_as_dictionary=False,
extensionarray=True,
count_nulls=True,
compression="zstd",
compression_level=None,
row_group_size=64 * 1024 * 1024,
data_page_size=None,
parquet_flavor=None,
parquet_version="2.4",
parquet_page_version="1.0",
parquet_metadata_statistics=True,
parquet_dictionary_encoding=False,
parquet_byte_stream_split=False,
parquet_coerce_timestamps=None,
parquet_old_int96_timestamps=None,
parquet_compliant_nested=False,
parquet_extra_options=None,
storage_options=None,
skip_bad_files=False,
):
"""Merges Parquet files together.
Args:
:param destination: Name of the output file or file path.
:type destination: path-like
:param files: List of local Parquet files to merge.
May contain glob patterns.
:type files: str or list of str
:param list_to32: If True, convert Awkward lists into 32-bit Arrow lists if they're small enough, even if it means an extra conversion.
Otherwise, signed 32-bit ak.types.ListType maps to Arrow ListType, signed 64-bit ak.types.ListType maps to Arrow LargeListType, and
unsigned 32-bit ak.types.ListType picks whichever Arrow type its values fit into. Command line option ``--list-to32``.
:type list_to32: bool
:param string_to32: Same as the above for Arrow string and ``large_string``. Command line option: ``--string-to32``.
:type string_to32: bool
:param bytestring_to32: Same as the above for Arrow binary and ``large_binary``. Command line option: ``--bytestring-to32``.
:type bytestring_to32: bool
:param emptyarray_to: If None, #ak.types.UnknownType maps to Arrow's
null type; otherwise, it is converted a given numeric dtype. Command line option: ``--emptyarray-to``.
:type emptyarray_to: None or dtype
:param categorical_as_dictionary: If True, #ak.contents.IndexedArray and
#ak.contents.IndexedOptionArray labeled with ``__array__ = "categorical"``
are mapped to Arrow `DictionaryArray`; otherwise, the projection is
evaluated before conversion (always the case without
`__array__ = "categorical"`). Command line option: ``--categorical-as-dictionary``.
:type categorical_as_dictionary: bool
:param extensionarray: If True, this function returns extended Arrow arrays
(at all levels of nesting), which preserve metadata so that Awkward \u2192
Arrow \u2192 Awkward preserves the array's #ak.types.Type (though not
the #ak.forms.Form). If False, this function returns generic Arrow arrays
that might be needed for third-party tools that don't recognize Arrow's
extensions. Even with `extensionarray=False`, the values produced by
Arrow's `to_pylist` method are the same as the values produced by Awkward's
#ak.to_list. Command line option: ``--extensionarray``.
:type extensionarray: bool
:param count_nulls: If True, count the number of missing values at each level
and include these in the resulting Arrow array, which makes some downstream
applications faster. If False, skip the up-front cost of counting them.
Command line option: ``--count-nulls``.
:type count_nulls: bool
:param compression: Compression algorithm name, passed to
`pyarrow.parquet.ParquetWriter <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html>`__.
Parquet supports `{"NONE", "SNAPPY", "GZIP", "BROTLI", "LZ4", "ZSTD"}`
(where `"GZIP"` is also known as "zlib" or "deflate"). If a dict, the keys
are column names (the same column names that #ak.forms.Form.columns returns
and #ak.forms.Form.select_columns accepts) and the values are compression
algorithm names, to compress each column differently. Command line option: ``--compression``.
:type compression: None, str, or dict
:param compression_level: Compression level, passed to
`pyarrow.parquet.ParquetWriter <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html>`__.
Compression levels have different meanings for different compression
algorithms: GZIP ranges from 1 to 9, but ZSTD ranges from -7 to 22, for
example. Generally, higher numbers provide slower but smaller compression. Command line option
``--compression-level``.
:type compression_level: None, int, or dict None
:param row_group_size: Maximum number of entries in each row group,
passed to `pyarrow.parquet.ParquetWriter.write_table <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html#pyarrow.parquet.ParquetWriter.write_table>`__.
If None, the Parquet default of 64 MiB is used. Command line options: ``-rg`` or ``--row-group-size``.
:type row_group_size: int or None
:param data_page_size: Number of bytes in each data page, passed to
`pyarrow.parquet.ParquetWriter <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html>`__.
If None, the Parquet default of 1 MiB is used. Command line option: ``--data-page-size``.
:type data_page_size: None or int
:param parquet_flavor: If None, the output Parquet file will follow
Arrow conventions; if `"spark"`, it will follow Spark conventions. Some
systems, such as Spark and Google BigQuery, might need Spark conventions,
while others might need Arrow conventions. Passed to
`pyarrow.parquet.ParquetWriter <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html>`__.
as `flavor`. Command line option: ``--parquet-flavor``.
:type parquet_flavor: None or `"spark"`
:param parquet_version: Parquet file format version.
Passed to `pyarrow.parquet.ParquetWriter <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html>`__.
as `version`. Command line option: ``--parquet-version``.
:type parquet_version: `"1.0"`, `"2.4"`, or `"2.6"`
:param parquet_page_version: Parquet page format version.
Passed to `pyarrow.parquet.ParquetWriter <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html)>`__.
as `data_page_version`. Command line option: ``--parquet-page-version``.
:type parquet_page_version: `"1.0"` or `"2.0"`
:param parquet_metadata_statistics: If True, include summary
statistics for each data page in the Parquet metadata, which lets some
applications search for data more quickly (by skipping pages). If a dict
mapping column names to bool, include summary statistics on only the
specified columns. Passed to
`pyarrow.parquet.ParquetWriter <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html>`__.
as `write_statistics`. Command line option: ``--parquet-metadata-statistics``.
:type parquet_metadata_statistics: bool or dict
:param parquet_dictionary_encoding: If True, allow Parquet to pre-compress
with dictionary encoding. If a dict mapping column names to bool, only
use dictionary encoding on the specified columns. Passed to
`pyarrow.parquet.ParquetWriter <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html>`__.
as `use_dictionary`. Command line option: ``--parquet-dictionary-encoding``.
:type parquet_dictionary_encoding: bool or dict
:param parquet_byte_stream_split: If True, pre-compress floating
point fields (`float32` or `float64`) with byte stream splitting, which
collects all mantissas in one part of the stream and exponents in another.
Passed to `pyarrow.parquet.ParquetWriter <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html>`__.
as `use_byte_stream_split`. Command line option: ``--parquet-byte-stream-split``.
:type parquet_byte_stream_split: bool or dict
:param parquet_coerce_timestamps: If None, any timestamps
(`datetime64` data) are coerced to a given resolution depending on
`parquet_version`: version `"1.0"` and `"2.4"` are coerced to microseconds,
but later versions use the `datetime64`'s own units. If `"ms"` is explicitly
specified, timestamps are coerced to milliseconds; if `"us"`, microseconds.
Passed to `pyarrow.parquet.ParquetWriter <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html>`__.
as `coerce_timestamps`. Command line option: ``--parquet-coerce-timestamps``.
:type parquet_coerce_timestamps: None, `"ms"`, or `"us"`
:param parquet_old_int96_timestamps: If True, use Parquet's INT96 format
for any timestamps (`datetime64` data), taking priority over `parquet_coerce_timestamps`.
If None, let the `parquet_flavor` decide. Passed to
`pyarrow.parquet.ParquetWriter <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html>`__.
as `use_deprecated_int96_timestamps`. Command line option: ``--parquet-old-int96-timestamps``.
:type parquet_old_int96_timestamps: None or bool
:param parquet_compliant_nested: If True, use the Spark/BigQuery/Parquet
`convention for nested lists <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types>`__,
in which each list is a one-field record with field name "`element`";
otherwise, use the Arrow convention, in which the field name is "`item`".
Passed to `pyarrow.parquet.ParquetWriter <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html>`__.
as `use_compliant_nested_type`. Command line option: ``--parquet-compliant-nested``.
:type parquet_compliated_nested: bool
:param parquet_extra_options: Any additional options to pass to
`pyarrow.parquet.ParquetWriter <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html>`__.
:type parquet_extra_options: None or dict
:param storage_options: Any additional options to pass to
`fsspec.core.url_to_fs <https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.core.url_to_fs>`__
to open a remote file for writing.
:type storage_options: None or dict
Examples:
---------
Converts a TTree from a ROOT file to a Parquet File.
>>> hepconvert.root_to_parquet(in_file="file.root", out_file="file.parquet")
Command Line Instructions:
--------------------------
This function can be run from the command line. Use command
.. code-block:: bash
"""
if not isinstance(in_files, list) and not isinstance(in_files, tuple):
path = Path(in_files)
in_files = sorted(path.glob("**/*.root"))
if len(in_files) < 2:
msg = f"Must have at least 2 files to merge, not {len(in_files)} files."
raise AttributeError(msg)
path = Path(out_file)
if Path.is_file(path) and not force:
raise FileExistsError

data = False
for file in in_files:
try:
ak.metadata_from_parquet(file)
except FileNotFoundError:
if skip_bad_files:
continue
msg = "File: {file} does not exist or is corrupt."
raise FileNotFoundError(msg) from None
if isinstance(data, bool):
data = ak.from_parquet(file)
else:
data = ak.merge_union_of_records(
ak.concatenate((data, ak.from_parquet(file))), axis=0
)

ak.to_parquet(
data,
out_file,
list_to32=list_to32,
string_to32=string_to32,
bytestring_to32=bytestring_to32,
emptyarray_to=emptyarray_to,
categorical_as_dictionary=categorical_as_dictionary,
extensionarray=extensionarray,
count_nulls=count_nulls,
compression=compression,
compression_level=compression_level,
row_group_size=row_group_size,
data_page_size=data_page_size,
parquet_flavor=parquet_flavor,
parquet_version=parquet_version,
parquet_page_version=parquet_page_version,
parquet_metadata_statistics=parquet_metadata_statistics,
parquet_dictionary_encoding=parquet_dictionary_encoding,
parquet_byte_stream_split=parquet_byte_stream_split,
parquet_coerce_timestamps=parquet_coerce_timestamps,
parquet_old_int96_timestamps=parquet_old_int96_timestamps,
parquet_compliant_nested=parquet_compliant_nested,
parquet_extra_options=parquet_extra_options,
storage_options=storage_options,
)


def merge_root(
destination,
files,
Expand Down
112 changes: 112 additions & 0 deletions tests/test_merge_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from __future__ import annotations

from pathlib import Path

import awkward as ak
import pytest

from hepconvert import merge, root_to_parquet

skhep_testdata = pytest.importorskip("skhep_testdata")


def simple_test(tmp_path):
arr1 = ak.Array(
{
"a": [
1,
2,
],
"b": [
1,
2,
],
"c": [
1,
2,
],
}
)
ak.to_parquet(arr1, Path(tmp_path / "arr1.parquet"))
arr2 = ak.Array(
{
"a": [7, 8, 9],
"b": [
3,
4,
5,
],
}
)
ak.to_parquet(arr2, Path(tmp_path / "arr2.parquet"))
arr3 = ak.Array(
{
"a": [10, 11, 12, 13, 14],
"c": [3, 4, 5, 6, 7],
"d": [1, 2, 3, 4, 5],
}
)
ak.to_parquet(arr3, Path(tmp_path / "arr3.parquet"))

merge.merge_parquet(
Path(tmp_path / "new.parquet"),
[
Path(tmp_path / "arr1.parquet"),
Path(tmp_path / "arr2.parquet"),
Path(tmp_path / "arr3.parquet"),
],
force=True,
)
array = ak.from_parquet(Path(tmp_path / "new.parquet"))
assert ak.all(array["a"] == [1, 2, 7, 8, 9, 10, 11, 12, 13, 14])
assert ak.all(
array["b"]
== [
1,
2,
3,
4,
5,
None,
None,
None,
None,
None,
]
)
assert ak.all(array["c"] == [1, 2, None, None, None, 3, 4, 5, 6, 7])
assert ak.all(
array["d"]
== [
None,
None,
None,
None,
None,
1,
2,
3,
4,
5,
]
)


def HZZ_test(tmp_path):
merge.merge_parquet(
Path(tmp_path / "/merged_hzz.parquet"),
[
Path(tmp_path / "/uproot-HZZ.parquet"),
Path(tmp_path / "/uproot-HZZ.parquet"),
],
force=True,
)
new_arrays = ak.from_parquet(Path(tmp_path / "/new.parquet"))
root_to_parquet(
Path(tmp_path / "/merged_hzz.root"),
Path(tmp_path / "/merged_hzz.parquet"),
force=True,
)
test = ak.from_parquet(Path(tmp_path / "/merged_hzz.parquet"))
for key in new_arrays.fields:
assert ak.all(new_arrays[key] == test[key])
File renamed without changes.

0 comments on commit acdee53

Please sign in to comment.