Skip to content

Commit

Permalink
Merge pull request #84 from martindurant/repacking
Browse files Browse the repository at this point in the history
Rename un/merge, allow groupby listify
  • Loading branch information
martindurant authored Nov 13, 2024
2 parents 5f5c752 + f80f443 commit 9f64d31
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 30 deletions.
8 changes: 4 additions & 4 deletions docs/demo/akimbo-demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@
"id": "5c253d82-c1bf-4535-9e61-fa4b8bde799c",
"metadata": {},
"source": [
"We can convert back to an awkward Record array with merge:"
"We can convert back to an awkward Record array with pack:"
]
},
{
Expand All @@ -955,7 +955,7 @@
"metadata": {},
"outputs": [],
"source": [
"merged = akimbo.merge(df[[\"run\", \"luminosityBlock\"]])"
"merged = akimbo.pack(df[[\"run\", \"luminosityBlock\"]])"
]
},
{
Expand Down Expand Up @@ -1024,7 +1024,7 @@
}
],
"source": [
"ak.to_parquet(akimbo.merge(df[df.luminosityBlock == maybe_strange_lumiblock]), \"strange.parquet\")"
"ak.to_parquet(akimbo.pack(df[df.luminosityBlock == maybe_strange_lumiblock]), \"strange.parquet\")"
]
},
{
Expand All @@ -1051,7 +1051,7 @@
}
],
"source": [
"ak.to_parquet(akimbo.merge(df[df.luminosityBlock == maybe_strange_lumiblock]).values._data, \"strange.parquet\")"
"ak.to_parquet(akimbo.pack(df[df.luminosityBlock == maybe_strange_lumiblock]).values._data, \"strange.parquet\")"
]
},
{
Expand Down
6 changes: 3 additions & 3 deletions docs/quickstart.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@
"id": "d60887e8-582b-474f-a79f-173bc62c4bd1",
"metadata": {},
"source": [
"Of these, two are \"normal\" fields - they can be made into dataframe columns containing no nesting. To unwrap the top record-like structure of the data, we can use ``unmerge``."
"Of these, two are \"normal\" fields - they can be made into dataframe columns containing no nesting. To unwrap the top record-like structure of the data, we can use ``unpack``."
]
},
{
Expand All @@ -361,7 +361,7 @@
"metadata": {},
"outputs": [],
"source": [
"df = data.ak.unmerge()"
"df = data.ak.unpack()"
]
},
{
Expand Down Expand Up @@ -591,7 +591,7 @@
"metadata": {},
"outputs": [],
"source": [
"s = df.ak.merge()"
"s = df.ak.pack()"
]
},
{
Expand Down
2 changes: 1 addition & 1 deletion example/cudf-ak.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@
" 'typestr',\n",
" 'typetracer',\n",
" 'unflatten',\n",
" 'unmerge',\n",
" 'unpack',\n",
" 'unzip',\n",
" 'validity_error',\n",
" 'values_astype',\n",
Expand Down
5 changes: 5 additions & 0 deletions src/akimbo/cudf.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ class CudfAwkwardAccessor(Accessor):
series_type = Series
dataframe_type = DataFrame

@classmethod
def _arrow_to_series(cls, data):
# this implies CPU->GPU copy
return Series(data)

@classmethod
def _to_output(cls, arr):
if isinstance(arr, ak.Array):
Expand Down
2 changes: 1 addition & 1 deletion src/akimbo/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def ak_to_series(ds, backend="pandas", extract=True):
else:
raise ValueError("Backend must be in {'pandas', 'polars', 'dask'}")
if extract and ds.fields:
return s.ak.unmerge()
return s.ak.unpack()
return s


Expand Down
58 changes: 47 additions & 11 deletions src/akimbo/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import pyarrow.compute as pc

from akimbo.apply_tree import dec, match_any, numeric, run_with_transform
from akimbo.utils import to_ak_layout
from akimbo.utils import rec_list_swap, to_ak_layout

methods = [
_ for _ in (dir(ak)) if not _.startswith(("_", "ak_")) and not _[0].isupper()
] + ["apply", "array", "explode", "dt", "str"]

df_methods = sorted(methods + ["merge"])
series_methods = sorted(methods + ["unmerge"])
df_methods = sorted(methods + ["pack"])
series_methods = sorted(methods + ["unpack"])


def radd(left, right):
Expand Down Expand Up @@ -165,11 +165,19 @@ def is_dataframe(cls, data):
return isinstance(data, cls.dataframe_type)

@classmethod
def _to_output(cls, data):
# TODO: clarify protocol here; can data be in arrow already?
def _arrow_to_series(cls, data):
"""How to make a series from arrow data"""
raise NotImplementedError

@classmethod
def _to_output(cls, data):
"""How to make a series from ak or arrow"""
if isinstance(data, ak.Array):
data = ak.to_arrow(data, extensionarray=False)
return cls._arrow_to_series(data)

def to_output(self, data=None):
"""Data returned as a series"""
data = data if data is not None else self.array
if not isinstance(data, Iterable):
return data
Expand Down Expand Up @@ -313,26 +321,54 @@ def rename(self, where, to):
parent.fields[this] = to
return self.to_output(ak.Array(lay))

def merge(self):
def pack(self):
"""Make a single complex series out of the columns of a dataframe"""
if not self.is_dataframe(self._obj):
raise ValueError("Can only merge on a dataframe")
raise ValueError("Can only pack on a dataframe")
out = {}
for k in self._obj.columns:
# TODO: partial merge when column names are like "record.field"
# TODO: partial pack when column names are like "record.field"
out[k] = self._obj[k].ak.array
arr = ak.Array(out)
return self.to_output(arr)

def unmerge(self):
def unpack(self):
"""Make dataframe out of a series of record type"""
# TODO: what to do when passed a dataframe, partial unpack of record fields?
arr = self.array
if not arr.fields:
raise ValueError("Not array-of-records")
# TODO: partial unmerge when (some) fields are records
out = {k: self.to_output(arr[k]) for k in arr.fields}
return self.dataframe_type(out)

def unexplode(self, *cols, outname="grouped"):
"""Repack "exploded" form dataframes into lists of structs
This is the inverse of the regular dataframe explode() process.
"""
# TODO: this does not work on cuDF as here we use arrow directly
# TODO: pandas indexes are pre-grouped cat-like structures
cols = list(cols)
arr = self.arrow
if set(cols) - set(arr.column_names):
raise ValueError(
"One or more rouping column (%s) not in available columns %s",
cols,
arr.column_names,
)
outcols = [(_, "list") for _ in arr.column_names if _ not in cols]
if not outcols:
raise ValueError("Cannot group on all available columns")
outcols2 = [f"{_[0]}_list" for _ in outcols]
grouped = arr.group_by(cols).aggregate(outcols)
akarr = ak.from_arrow(grouped)
akarr2 = akarr[outcols2]
akarr2.layout._fields = [_[0] for _ in outcols]
struct = rec_list_swap(akarr2)
final = ak.with_field(akarr[cols], struct, outname)

return self._to_output(final).ak.unpack()

def join(
self,
other,
Expand Down Expand Up @@ -394,7 +430,7 @@ def f(self, *args, **kw):
**kw,
)
if isinstance(self._obj, self.dataframe_type):
return out.ak.unmerge()
return out.ak.unpack()
return out

return f
Expand Down
6 changes: 2 additions & 4 deletions src/akimbo/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ def to_arrow(cls, data):
return pa.table(data)

@classmethod
def _to_output(cls, data):
return pd.Series(
pd.arrays.ArrowExtensionArray(ak.to_arrow(data, extensionarray=False))
)
def _arrow_to_series(cls, data):
return pd.Series(pd.arrays.ArrowExtensionArray(data))

def to_output(self, data=None):
# override to apply index
Expand Down
9 changes: 6 additions & 3 deletions src/akimbo/polars.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import awkward as ak
import polars as pl

from akimbo.mixin import Accessor
Expand All @@ -13,9 +12,13 @@ class PolarsAwkwardAccessor(Accessor):
dataframe_type = pl.DataFrame

@classmethod
def _to_output(cls, arr):
return pl.from_arrow(ak.to_arrow(arr, extensionarray=False))
def _arrow_to_series(cls, arr):
return pl.from_arrow(arr)

@classmethod
def to_arrow(cls, data):
return data.to_arrow()

def pack(self):
# polars already implements this directly
return self._obj.to_struct()
9 changes: 9 additions & 0 deletions src/akimbo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,12 @@ def to_ak_layout(ar):

def match_string(*layout):
return layout[0].is_list and layout[0].parameter("__array__") == "string"


def rec_list_swap(arr: ak.Array, field: str | None = None) -> ak.Array:
"""Make a record-of-lists into a list-of-records, assuming the lists have the same lengths"""
record_of_lists = arr[field] if field else arr
list_of_records = ak.zip(
dict(zip(ak.fields(record_of_lists), ak.unzip(record_of_lists))), depth_limit=2
)
return ak.with_field(arr, list_of_records, field) if field else list_of_records
39 changes: 39 additions & 0 deletions tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,42 @@ def test_rename():

s2 = s.ak.rename(("a", "b", "c"), "d")
assert s2.tolist() == [{"a": [{"b": {"d": 0}}] * 2}] * 3


def test_unexplode():
df = pd.DataFrame(
{
"x": [1, 1, 1, 2, 1, 3, 3, 1],
"y": [1, 1, 1, 2, 1, 3, 3, 1],
"z": [1, 1, 1, 2, 1, 3, 3, 2],
}
)
out = df.ak.unexplode("x")
compact = out["grouped"].tolist()
expected = [
[
{"y": 1, "z": 1},
{"y": 1, "z": 1},
{"y": 1, "z": 1},
{"y": 1, "z": 1},
{"y": 1, "z": 2},
],
[{"y": 2, "z": 2}],
[{"y": 3, "z": 3}, {"y": 3, "z": 3}],
]
assert compact == expected

out = df.ak.unexplode("x", "y")
compact = out["grouped"].tolist()
expected = [
[{"z": 1}, {"z": 1}, {"z": 1}, {"z": 1}, {"z": 2}],
[{"z": 2}],
[{"z": 3}, {"z": 3}],
]
assert compact == expected

with pytest.raises(ValueError):
df.ak.unexplode("x", "y", "z")

with pytest.raises(ValueError):
df.ak.unexplode("unknown")
45 changes: 42 additions & 3 deletions tests/test_polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ def test_apply_where():
assert s2[0] == {"a": [-1, -2, -3], "b": [1, 2, 3]}


def test_merge_unmerge():
def test_pack_unpack():
data = [
{"a": [1, 2, 3], "b": [1, 2, 3]},
{"a": [1, 2, 3], "b": [1, 2, 3]},
{"a": [1, 2, 3], "b": [1, 2, 3]},
]
s = pl.Series(data)
df = s.ak.unmerge()
df = s.ak.unpack()
assert df["a"].to_list() == [[1, 2, 3]] * 3
s2 = df.ak.merge()
s2 = df.ak.pack()
assert s.to_list() == s2.to_list()


Expand All @@ -58,3 +58,42 @@ def test_ufunc():
df = pl.DataFrame({"a": s})
df2 = df.ak + 1
assert df2["a"].to_list() == [[2, 3, 4], [], [5, 6]]


def test_unexplode():
df = pl.DataFrame(
{
"x": [1, 1, 1, 2, 1, 3, 3, 1],
"y": [1, 1, 1, 2, 1, 3, 3, 1],
"z": [1, 1, 1, 2, 1, 3, 3, 2],
}
)
out = df.ak.unexplode("x")
compact = out["grouped"].to_list()
expected = [
[
{"y": 1, "z": 1},
{"y": 1, "z": 1},
{"y": 1, "z": 1},
{"y": 1, "z": 1},
{"y": 1, "z": 2},
],
[{"y": 2, "z": 2}],
[{"y": 3, "z": 3}, {"y": 3, "z": 3}],
]
assert compact == expected

out = df.ak.unexplode("x", "y")
compact = out["grouped"].to_list()
expected = [
[{"z": 1}, {"z": 1}, {"z": 1}, {"z": 1}, {"z": 2}],
[{"z": 2}],
[{"z": 3}, {"z": 3}],
]
assert compact == expected

with pytest.raises(ValueError):
df.ak.unexplode("x", "y", "z")

with pytest.raises(ValueError):
df.ak.unexplode("unknown")

0 comments on commit 9f64d31

Please sign in to comment.