diff --git a/docs/demo/akimbo-demo.ipynb b/docs/demo/akimbo-demo.ipynb index f06ae2f..7b992c1 100644 --- a/docs/demo/akimbo-demo.ipynb +++ b/docs/demo/akimbo-demo.ipynb @@ -945,7 +945,7 @@ "id": "5c253d82-c1bf-4535-9e61-fa4b8bde799c", "metadata": {}, "source": [ - "We can convert back to an awkward Record array with merge:" + "We can convert back to an awkward Record array with pack:" ] }, { @@ -955,7 +955,7 @@ "metadata": {}, "outputs": [], "source": [ - "merged = akimbo.merge(df[[\"run\", \"luminosityBlock\"]])" + "merged = akimbo.pack(df[[\"run\", \"luminosityBlock\"]])" ] }, { @@ -1024,7 +1024,7 @@ } ], "source": [ - "ak.to_parquet(akimbo.merge(df[df.luminosityBlock == maybe_strange_lumiblock]), \"strange.parquet\")" + "ak.to_parquet(akimbo.pack(df[df.luminosityBlock == maybe_strange_lumiblock]), \"strange.parquet\")" ] }, { @@ -1051,7 +1051,7 @@ } ], "source": [ - "ak.to_parquet(akimbo.merge(df[df.luminosityBlock == maybe_strange_lumiblock]).values._data, \"strange.parquet\")" + "ak.to_parquet(akimbo.pack(df[df.luminosityBlock == maybe_strange_lumiblock]).values._data, \"strange.parquet\")" ] }, { diff --git a/docs/quickstart.ipynb b/docs/quickstart.ipynb index 48a5983..2046468 100644 --- a/docs/quickstart.ipynb +++ b/docs/quickstart.ipynb @@ -351,7 +351,7 @@ "id": "d60887e8-582b-474f-a79f-173bc62c4bd1", "metadata": {}, "source": [ - "Of these, two are \"normal\" fields - they can be made into dataframe columns containing no nesting. To unwrap the top record-like structure of the data, we can use ``unmerge``." + "Of these, two are \"normal\" fields - they can be made into dataframe columns containing no nesting. To unwrap the top record-like structure of the data, we can use ``unpack``." ] }, { @@ -361,7 +361,7 @@ "metadata": {}, "outputs": [], "source": [ - "df = data.ak.unmerge()" + "df = data.ak.unpack()" ] }, { @@ -591,7 +591,7 @@ "metadata": {}, "outputs": [], "source": [ - "s = df.ak.merge()" + "s = df.ak.pack()" ] }, { diff --git a/example/cudf-ak.ipynb b/example/cudf-ak.ipynb index db5008d..8d82eba 100644 --- a/example/cudf-ak.ipynb +++ b/example/cudf-ak.ipynb @@ -273,7 +273,7 @@ " 'typestr',\n", " 'typetracer',\n", " 'unflatten',\n", - " 'unmerge',\n", + " 'unpack',\n", " 'unzip',\n", " 'validity_error',\n", " 'values_astype',\n", diff --git a/src/akimbo/apply_tree.py b/src/akimbo/apply_tree.py index 9aed44d..a6531a7 100644 --- a/src/akimbo/apply_tree.py +++ b/src/akimbo/apply_tree.py @@ -8,11 +8,22 @@ import pyarrow as pa +def match_any(*layout, **_): + return True + + def leaf(*layout, **_): """True for the lowest elements of any akwward layout tree""" return layout[0].is_leaf +def numeric(*layout, **_): + return layout[0].is_leaf and layout[0].parameters.get("__array__", None) not in { + "string", + "char", + } + + def run_with_transform( arr: ak.Array, op, @@ -24,6 +35,8 @@ def run_with_transform( **kw, ) -> ak.Array: def func(layout, **kwargs): + from akimbo.utils import match_string + if not isinstance(layout, tuple): layout = (layout,) if all(match(lay, **(match_kwargs or {})) for lay in layout): @@ -34,11 +47,23 @@ def func(layout, **kwargs): elif inmode == "numpy": # works on numpy/cupy contents out = op(*(lay.data for lay in layout), **kw, **(match_kwargs or {})) - else: + elif inmode == "ak": out = op(*layout, **kw, **(match_kwargs or {})) - return outtype(out) if callable(outtype) else out + else: + out = op( + *(ak.Array(lay) for lay in layout), **kw, **(match_kwargs or {}) + ) + if callable(outtype): + return outtype(out) + elif isinstance(out, ak.Array): + return out.layout + else: + return out + if match_string(*layout): + # non-string op may fail to descend into string + return layout[0] - return ak.transform(func, arr, *others) + return ak.transform(func, arr, *others, allow_records=True) def dec( diff --git a/src/akimbo/cudf.py b/src/akimbo/cudf.py index 6ef07a0..f59f76d 100644 --- a/src/akimbo/cudf.py +++ b/src/akimbo/cudf.py @@ -47,7 +47,7 @@ def dec_cu(op, match=match_string): def f(lay, **kwargs): # op(column, ...)->column col = op(lay._to_cudf(cudf, None, len(lay)), **kwargs) - return from_cudf(cudf.Series(col)).layout + return from_cudf(cudf.Series._from_column(col)).layout return dec(func=f, match=match, inmode="ak") @@ -61,7 +61,7 @@ def f(lay, method=meth, **kwargs): # this is different from dec_cu, because we need to instantiate StringMethods # before getting the method from it col = getattr( - StringMethods(cudf.Series(lay._to_cudf(cudf, None, len(lay)))), method + StringMethods(cudf.Series._from_column(lay._to_cudf(cudf, None, len(lay)))), method )(**kwargs) return from_cudf(col).layout @@ -87,7 +87,7 @@ def f(lay, method=meth, **kwargs): else: # attributes giving components col = m - return from_cudf(cudf.Series(col)).layout + return from_cudf(cudf.Series._from_column(col)).layout if isinstance(getattr(DatetimeColumn, meth), property): setattr( @@ -103,6 +103,11 @@ class CudfAwkwardAccessor(Accessor): series_type = Series dataframe_type = DataFrame + @classmethod + def _arrow_to_series(cls, data): + # this implies CPU->GPU copy + return Series(data) + @classmethod def _to_output(cls, arr): if isinstance(arr, ak.Array): diff --git a/src/akimbo/daft.py b/src/akimbo/daft.py new file mode 100644 index 0000000..ee79767 --- /dev/null +++ b/src/akimbo/daft.py @@ -0,0 +1,94 @@ +import awkward as ak +import daft +from daft.udf import StatelessUDF + +from akimbo.apply_tree import run_with_transform +from akimbo.mixin import Accessor + + +class DaftAccessor(Accessor): + # this does not perform the operation, but makes a callable udf with + # the right annotation + def __getattr__(self, item): + func = getattr(ak, item) + return self._call_with_func(func) + + def _call_with_func(self, func): + # todo: this should use run_with_transform + + def closure(*args, **kwargs) -> callable: + not_series = [_ for _ in args if not isinstance(_, daft.DataFrame)] + other_series = [_ for _ in args if isinstance(_, daft.DataFrame)] + + if not other_series: + + def f(x: daft.Series) -> daft.Series: + # unary + arr = ak.from_arrow(x.to_arrow()) + ak_arr = ak.Array({x.name(): func(arr, *not_series, **kwargs)}) + if isinstance(ak_arr, ak.Array): + # like _to_output + return daft.Series.from_arrow( + ak.to_arrow(ak_arr, extensionarray=False) + ) + return ak_arr + + else: + + def f(x: daft.Series, y: daft.Series) -> daft.Series: + # binary + arrx = ak.from_arrow(x.to_arrow()) + arry = ak.from_arrow(y.to_arrow()) + + ak_arr = ak.Array( + {x.name(): func(arr_x, arr_y, *not_series, **kwargs)} + ) + if isinstance(ak_arr, ak.Array): + # like _to_output + return daft.Series.from_arrow( + ak.to_arrow(ak_arr, extensionarray=False) + ) + return ak_arr + + schema = self._obj.schema().to_pyarrow_schema() + ak_schema = ak.from_arrow_schema(schema) + arr = ak.Array(ak_schema.length_zero_array()) # typetracer?? + out = func(arr) + outtype = daft.DataType.from_arrow_type( + ak.to_arrow(out, extensionarray=False).type + ) + udf = StatelessUDF( + name=func.__name__, + func=f, + return_dtype=outtype, + resource_request=None, + batch_size=None, + ) + return udf + + return closure + + def __array_ufunc__(self, *args, **kwargs): + # ufuncs + if args[1] == "__call__": + return self._call_with_func(args[0])(self, **kwargs) + raise NotImplementedError + + @classmethod + def _create_op(cls, op): + # dunder methods + + def run(self, *args, **kwargs): + # closure = + return cls._call_with_func() + + return run + + +@property # type:ignore +def ak_property(self): + return DaftAccessor(self) + + +daft.DataFrame.ak = ak_property +daft.Series.ak = ak_property diff --git a/src/akimbo/io.py b/src/akimbo/io.py index 9c75230..afaf496 100644 --- a/src/akimbo/io.py +++ b/src/akimbo/io.py @@ -26,7 +26,7 @@ def ak_to_series(ds, backend="pandas", extract=True): else: raise ValueError("Backend must be in {'pandas', 'polars', 'dask'}") if extract and ds.fields: - return s.ak.unmerge() + return s.ak.unpack() return s diff --git a/src/akimbo/mixin.py b/src/akimbo/mixin.py index 82faf73..227a52b 100644 --- a/src/akimbo/mixin.py +++ b/src/akimbo/mixin.py @@ -7,14 +7,15 @@ import awkward as ak import pyarrow.compute as pc -from akimbo.apply_tree import dec +from akimbo.apply_tree import dec, match_any, numeric, run_with_transform +from akimbo.utils import rec_list_swap, to_ak_layout methods = [ _ for _ in (dir(ak)) if not _.startswith(("_", "ak_")) and not _[0].isupper() ] + ["apply", "array", "explode", "dt", "str"] -df_methods = sorted(methods + ["merge"]) -series_methods = sorted(methods + ["unmerge"]) +df_methods = sorted(methods + ["pack"]) +series_methods = sorted(methods + ["unpack"]) def radd(left, right): @@ -164,11 +165,19 @@ def is_dataframe(cls, data): return isinstance(data, cls.dataframe_type) @classmethod - def _to_output(cls, data): - # TODO: clarify protocol here; can data be in arrow already? + def _arrow_to_series(cls, data): + """How to make a series from arrow data""" raise NotImplementedError + @classmethod + def _to_output(cls, data): + """How to make a series from ak or arrow""" + if isinstance(data, ak.Array): + data = ak.to_arrow(data, extensionarray=False) + return cls._arrow_to_series(data) + def to_output(self, data=None): + """Data returned as a series""" data = data if data is not None else self.array if not isinstance(data, Iterable): return data @@ -179,6 +188,9 @@ def apply(self, fn: Callable, where=None, **kwargs): The function should take an ak array as input and produce an ak array or scalar. + + Unlike ``transform``, the function takes and returns ak.Array instances + and acts on a whole schema tree. """ if where: bits = tuple(where.split(".")) @@ -190,6 +202,44 @@ def apply(self, fn: Callable, where=None, **kwargs): final = fn(self.array) return self.to_output(final) + def transform( + self, fn: Callable, *others, where=None, match=match_any, inmode="ak", **kwargs + ): + """Perform arbitrary function to selected parts of the data tree + + This process walks thought the data's schema tree, and applies the given + function only on the matching nodes. + + Parameters + ---------- + fn: the operation you want to perform. Typically unary or binary, and may take + extra kwargs + others: extra arguments, perhaps other akimbo series + where: path in the schema tree to apply this + match: when walking the schema, this determines if a node should be processed; + it will be a function taking one or more ak.contents classes. ak.apaply_tree + contains convenience matchers macth_any, leaf and numeric, and more matchers + can be found in the string and datetime modules + inmode: data should be passed to the given function as: + "arrow" | "numpy" (includes cupy) | "ak" layout | "array" high-level ak.Array + kwargs: passed to the operation, except those that are taken by ``run_with_transform``. + """ + if where: + bits = tuple(where.split(".")) + arr = self.array + part = arr.__getitem__(bits) + # TODO: apply ``where`` to any arrays in others + # other = [to_ak_layout(ar) for ar in others] + out = run_with_transform( + part, fn, match=match, others=others, inmode=inmode, **kwargs + ) + final = ak.with_field(arr, out, where=where) + else: + final = run_with_transform( + self.array, fn, match=match, others=others, inmode=inmode, **kwargs + ) + return self.to_output(final) + def __getitem__(self, item): out = self.array.__getitem__(item) return self.to_output(out) @@ -271,26 +321,54 @@ def rename(self, where, to): parent.fields[this] = to return self.to_output(ak.Array(lay)) - def merge(self): + def pack(self): """Make a single complex series out of the columns of a dataframe""" if not self.is_dataframe(self._obj): - raise ValueError("Can only merge on a dataframe") + raise ValueError("Can only pack on a dataframe") out = {} for k in self._obj.columns: - # TODO: partial merge when column names are like "record.field" + # TODO: partial pack when column names are like "record.field" out[k] = self._obj[k].ak.array arr = ak.Array(out) return self.to_output(arr) - def unmerge(self): + def unpack(self): """Make dataframe out of a series of record type""" + # TODO: what to do when passed a dataframe, partial unpack of record fields? arr = self.array if not arr.fields: raise ValueError("Not array-of-records") - # TODO: partial unmerge when (some) fields are records out = {k: self.to_output(arr[k]) for k in arr.fields} return self.dataframe_type(out) + def unexplode(self, *cols, outname="grouped"): + """Repack "exploded" form dataframes into lists of structs + + This is the inverse of the regular dataframe explode() process. + """ + # TODO: this does not work on cuDF as here we use arrow directly + # TODO: pandas indexes are pre-grouped cat-like structures + cols = list(cols) + arr = self.arrow + if set(cols) - set(arr.column_names): + raise ValueError( + "One or more rouping column (%s) not in available columns %s", + cols, + arr.column_names, + ) + outcols = [(_, "list") for _ in arr.column_names if _ not in cols] + if not outcols: + raise ValueError("Cannot group on all available columns") + outcols2 = [f"{_[0]}_list" for _ in outcols] + grouped = arr.group_by(cols).aggregate(outcols) + akarr = ak.from_arrow(grouped) + akarr2 = akarr[outcols2] + akarr2.layout._fields = [_[0] for _ in outcols] + struct = rec_list_swap(akarr2) + final = ak.with_field(akarr[cols], struct, outname) + + return self._to_output(final).ak.unpack() + def join( self, other, @@ -331,12 +409,31 @@ def join( def _create_op(cls, op): """Make functions to perform all the arithmetic, logical and comparison ops""" - def run(self, *args, **kwargs): - ar2 = (ar.ak.array if hasattr(ar, "ak") else ar for ar in args) - ar3 = (ar.array if isinstance(ar, cls) else ar for ar in ar2) - return self.to_output(op(self.array, *ar3, **kwargs)) + def op2(*args, extra=None, **kw): + args = list(args) + list(extra or []) + return op(*args, **kw) + + def f(self, *args, **kw): + # TODO: test here is for literals, but really we want "don't know how to + # array that" condition + extra = (_ for _ in args if isinstance(_, (str, int, float))) + args = ( + to_ak_layout(_) for _ in args if not isinstance(_, (str, int, float)) + ) + out = self.transform( + op2, + *args, + match=numeric, + inmode="numpy", + extra=extra, + outtype=ak.contents.NumpyArray, + **kw, + ) + if isinstance(self._obj, self.dataframe_type): + return out.ak.unpack() + return out - return run + return f def __getattr__(self, item): arr = self.array diff --git a/src/akimbo/pandas.py b/src/akimbo/pandas.py index 7946375..996e6ae 100644 --- a/src/akimbo/pandas.py +++ b/src/akimbo/pandas.py @@ -35,10 +35,8 @@ def to_arrow(cls, data): return pa.table(data) @classmethod - def _to_output(cls, data): - return pd.Series( - pd.arrays.ArrowExtensionArray(ak.to_arrow(data, extensionarray=False)) - ) + def _arrow_to_series(cls, data): + return pd.Series(pd.arrays.ArrowExtensionArray(data)) def to_output(self, data=None): # override to apply index diff --git a/src/akimbo/polars.py b/src/akimbo/polars.py index ffc3811..db20ab9 100644 --- a/src/akimbo/polars.py +++ b/src/akimbo/polars.py @@ -1,4 +1,3 @@ -import awkward as ak import polars as pl from akimbo.mixin import Accessor @@ -13,9 +12,13 @@ class PolarsAwkwardAccessor(Accessor): dataframe_type = pl.DataFrame @classmethod - def _to_output(cls, arr): - return pl.from_arrow(ak.to_arrow(arr, extensionarray=False)) + def _arrow_to_series(cls, arr): + return pl.from_arrow(arr) @classmethod def to_arrow(cls, data): return data.to_arrow() + + def pack(self): + # polars already implements this directly + return self._obj.to_struct() diff --git a/src/akimbo/spark.py b/src/akimbo/spark.py new file mode 100644 index 0000000..1807423 --- /dev/null +++ b/src/akimbo/spark.py @@ -0,0 +1,35 @@ +from typing import Iterable + +import awkward as ak +import pandas as pd +import pyspark +from pyspark.sql.functions import col, pandas_udf + +import akimbo.pandas +from akimbo.mixin import Accessor + +# https://docs.databricks.com/en/udf/pandas.html + + +class SparkAccessor(Accessor): + def __dir__(self) -> Iterable[str]: + return dir(pd.DataFrame.ak) + + def __getattr__(self, item): + @pandas_udf(returnType=pyspark.sql.types.BooleanType()) + def run(x: pd.Series) -> pd.Series: + import akimbo.pandas + + print(x) + out = x.ak.is_none() + return out + + return self._obj.select(run("x")) + + +@property # type:ignore +def ak_property(self): + return SparkAccessor(self) + + +pyspark.sql.DataFrame.ak = ak_property diff --git a/src/akimbo/strings.py b/src/akimbo/strings.py index 6e7fcb2..1460154 100644 --- a/src/akimbo/strings.py +++ b/src/akimbo/strings.py @@ -8,10 +8,7 @@ from akimbo.apply_tree import dec from akimbo.mixin import Accessor - - -def match_string(*layout): - return layout[0].is_list and layout[0].parameter("__array__") == "string" +from akimbo.utils import match_string def _encode(layout): @@ -53,14 +50,21 @@ def _decode(layout): # make sensible defaults for strptime strptime = functools.wraps(pc.strptime)( - lambda *args, format="%FT%T", unit="s", error_is_null=True, **kw: - pc.strptime(*args, format=format, unit=unit, error_is_null=error_is_null) + lambda *args, format="%FT%T", unit="s", error_is_null=True, **kw: pc.strptime( + *args, format=format, unit=unit, error_is_null=error_is_null + ) ) class StringAccessor: """String operations on nested/var-length data""" + # TODO: implement dunder add (concat strings) and mul (repeat strings) + # - s.ak.str + "suffix" (and arguments swapped) + # - s.ak.str + s2.ak.str (with matching schemas) + # - s.ak.str * N (and arguments swapped) + # - s.ak.str * s (where each string maps to integers for variable repeats) + def __init__(self, accessor): self.accessor = accessor diff --git a/src/akimbo/utils.py b/src/akimbo/utils.py index cfda842..de0dfce 100644 --- a/src/akimbo/utils.py +++ b/src/akimbo/utils.py @@ -1,3 +1,8 @@ +from __future__ import annotations + +import awkward as ak + + class NoAttributes: """Allows importing akimbo.cudf even if cudf isn't installed @@ -20,3 +25,27 @@ def __call__(self, *args, **kwargs): __name__ = "DummyAttributesObject" __doc__ = None __annotations__ = None + + +def to_ak_layout(ar): + if hasattr(ar, "ak"): + return ar.ak.array + elif hasattr(ar, "array"): + return ar.array + elif isinstance(ar, (ak.Array)): + return ar + else: + return ak.Array(ak.to_layout(ar)) + + +def match_string(*layout): + return layout[0].is_list and layout[0].parameter("__array__") == "string" + + +def rec_list_swap(arr: ak.Array, field: str | None = None) -> ak.Array: + """Make a record-of-lists into a list-of-records, assuming the lists have the same lengths""" + record_of_lists = arr[field] if field else arr + list_of_records = ak.zip( + dict(zip(ak.fields(record_of_lists), ak.unzip(record_of_lists))), depth_limit=2 + ) + return ak.with_field(arr, list_of_records, field) if field else list_of_records diff --git a/tests/test_pandas.py b/tests/test_pandas.py index 13ae2b4..4c3a22a 100644 --- a/tests/test_pandas.py +++ b/tests/test_pandas.py @@ -49,6 +49,58 @@ def test_ufunc(): assert (s.ak + s.ak).tolist() == [[2, 4, 6], [8, 10], [12]] assert (s.ak + s).tolist() == [[2, 4, 6], [8, 10], [12]] + s = pd.DataFrame({"a": s}) + assert (s.ak + 1).a.tolist() == [[2, 3, 4], [5, 6], [7]] + + assert (s.ak + s.ak).a.tolist() == [[2, 4, 6], [8, 10], [12]] + assert (s.ak + s).a.tolist() == [[2, 4, 6], [8, 10], [12]] + + +def test_manual_ufunc(): + from akimbo.apply_tree import numeric + + df = pd.DataFrame( + {"a": [["hey", "hi", "ho"], [None], ["blah"]], "b": [[1, 2, 3], [4, 5], [6]]} + ) + df2 = df.ak.transform( + lambda x: x + 1, match=numeric, inmode="numpy", outtype=ak.contents.NumpyArray + ) + expected = [ + {"a": ["hey", "hi", "ho"], "b": [2, 3, 4]}, + {"a": [None], "b": [5, 6]}, + {"a": ["blah"], "b": [7]}, + ] + assert df2.tolist() == expected + + +def test_mixed_ufunc(): + # ufuncs are numeric only by default, doesn't touch strings + df = pd.DataFrame( + {"a": [["hey", "hi", "ho"], [None], ["blah"]], "b": [[1, 2, 3], [4, 5], [6]]} + ) + df2 = df.ak + 1 + expected = [ + {"a": ["hey", "hi", "ho"], "b": [2, 3, 4]}, + {"a": [None], "b": [5, 6]}, + {"a": ["blah"], "b": [7]}, + ] + assert df2.ak.tolist() == expected + + df2 = df.ak * 2 + expected = [ + {"a": ["hey", "hi", "ho"], "b": [2, 4, 6]}, + {"a": [None], "b": [8, 10]}, + {"a": ["blah"], "b": [12]}, + ] + assert df2.ak.tolist() == expected + df2 = 2 * df.ak + assert df2.ak.tolist() == expected + + df2 = df.ak == df.ak + expected = [[True, True, True], [True, True], [True]] + assert df2["b"].tolist() == expected + assert df2["a"].tolist() == df["a"].tolist() + def test_to_autoarrow(): a = [[1, 2, 3], [4, 5], [6]] @@ -70,3 +122,42 @@ def test_rename(): s2 = s.ak.rename(("a", "b", "c"), "d") assert s2.tolist() == [{"a": [{"b": {"d": 0}}] * 2}] * 3 + + +def test_unexplode(): + df = pd.DataFrame( + { + "x": [1, 1, 1, 2, 1, 3, 3, 1], + "y": [1, 1, 1, 2, 1, 3, 3, 1], + "z": [1, 1, 1, 2, 1, 3, 3, 2], + } + ) + out = df.ak.unexplode("x") + compact = out["grouped"].tolist() + expected = [ + [ + {"y": 1, "z": 1}, + {"y": 1, "z": 1}, + {"y": 1, "z": 1}, + {"y": 1, "z": 1}, + {"y": 1, "z": 2}, + ], + [{"y": 2, "z": 2}], + [{"y": 3, "z": 3}, {"y": 3, "z": 3}], + ] + assert compact == expected + + out = df.ak.unexplode("x", "y") + compact = out["grouped"].tolist() + expected = [ + [{"z": 1}, {"z": 1}, {"z": 1}, {"z": 1}, {"z": 2}], + [{"z": 2}], + [{"z": 3}, {"z": 3}], + ] + assert compact == expected + + with pytest.raises(ValueError): + df.ak.unexplode("x", "y", "z") + + with pytest.raises(ValueError): + df.ak.unexplode("unknown") diff --git a/tests/test_polars.py b/tests/test_polars.py index 56b5d93..d2d0f23 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -28,16 +28,16 @@ def test_apply_where(): assert s2[0] == {"a": [-1, -2, -3], "b": [1, 2, 3]} -def test_merge_unmerge(): +def test_pack_unpack(): data = [ {"a": [1, 2, 3], "b": [1, 2, 3]}, {"a": [1, 2, 3], "b": [1, 2, 3]}, {"a": [1, 2, 3], "b": [1, 2, 3]}, ] s = pl.Series(data) - df = s.ak.unmerge() + df = s.ak.unpack() assert df["a"].to_list() == [[1, 2, 3]] * 3 - s2 = df.ak.merge() + s2 = df.ak.pack() assert s.to_list() == s2.to_list() @@ -54,3 +54,46 @@ def test_ufunc(): s2 = np.add(s.ak, 1) assert s2.to_list() == [[2, 3, 4], [], [5, 6]] + + df = pl.DataFrame({"a": s}) + df2 = df.ak + 1 + assert df2["a"].to_list() == [[2, 3, 4], [], [5, 6]] + + +def test_unexplode(): + df = pl.DataFrame( + { + "x": [1, 1, 1, 2, 1, 3, 3, 1], + "y": [1, 1, 1, 2, 1, 3, 3, 1], + "z": [1, 1, 1, 2, 1, 3, 3, 2], + } + ) + out = df.ak.unexplode("x") + compact = out["grouped"].to_list() + expected = [ + [ + {"y": 1, "z": 1}, + {"y": 1, "z": 1}, + {"y": 1, "z": 1}, + {"y": 1, "z": 1}, + {"y": 1, "z": 2}, + ], + [{"y": 2, "z": 2}], + [{"y": 3, "z": 3}, {"y": 3, "z": 3}], + ] + assert compact == expected + + out = df.ak.unexplode("x", "y") + compact = out["grouped"].to_list() + expected = [ + [{"z": 1}, {"z": 1}, {"z": 1}, {"z": 1}, {"z": 2}], + [{"z": 2}], + [{"z": 3}, {"z": 3}], + ] + assert compact == expected + + with pytest.raises(ValueError): + df.ak.unexplode("x", "y", "z") + + with pytest.raises(ValueError): + df.ak.unexplode("unknown") diff --git a/tests/test_spark.py b/tests/test_spark.py new file mode 100644 index 0000000..2a9a7f0 --- /dev/null +++ b/tests/test_spark.py @@ -0,0 +1,19 @@ +import pytest + +pd = pytest.importorskip("pandas") +pyspark = pytest.importorskip("pyspark") +import akimbo.spark + + +@pytest.fixture(scope="module") +def spark(): + from pyspark.sql import SparkSession + + return SparkSession.builder.appName("test").getOrCreate() + + +def test1(spark): + x = pd.Series([1, 2, 3]) + df = spark.createDataFrame(pd.DataFrame(x, columns=["x"])) + out = df.ak.is_none.collect() + assert out.tolist() == [False, False, False]