Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add daft and spark #81

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/demo/akimbo-demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@
"id": "5c253d82-c1bf-4535-9e61-fa4b8bde799c",
"metadata": {},
"source": [
"We can convert back to an awkward Record array with merge:"
"We can convert back to an awkward Record array with pack:"
]
},
{
Expand All @@ -955,7 +955,7 @@
"metadata": {},
"outputs": [],
"source": [
"merged = akimbo.merge(df[[\"run\", \"luminosityBlock\"]])"
"merged = akimbo.pack(df[[\"run\", \"luminosityBlock\"]])"
]
},
{
Expand Down Expand Up @@ -1024,7 +1024,7 @@
}
],
"source": [
"ak.to_parquet(akimbo.merge(df[df.luminosityBlock == maybe_strange_lumiblock]), \"strange.parquet\")"
"ak.to_parquet(akimbo.pack(df[df.luminosityBlock == maybe_strange_lumiblock]), \"strange.parquet\")"
]
},
{
Expand All @@ -1051,7 +1051,7 @@
}
],
"source": [
"ak.to_parquet(akimbo.merge(df[df.luminosityBlock == maybe_strange_lumiblock]).values._data, \"strange.parquet\")"
"ak.to_parquet(akimbo.pack(df[df.luminosityBlock == maybe_strange_lumiblock]).values._data, \"strange.parquet\")"
]
},
{
Expand Down
6 changes: 3 additions & 3 deletions docs/quickstart.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@
"id": "d60887e8-582b-474f-a79f-173bc62c4bd1",
"metadata": {},
"source": [
"Of these, two are \"normal\" fields - they can be made into dataframe columns containing no nesting. To unwrap the top record-like structure of the data, we can use ``unmerge``."
"Of these, two are \"normal\" fields - they can be made into dataframe columns containing no nesting. To unwrap the top record-like structure of the data, we can use ``unpack``."
]
},
{
Expand All @@ -361,7 +361,7 @@
"metadata": {},
"outputs": [],
"source": [
"df = data.ak.unmerge()"
"df = data.ak.unpack()"
]
},
{
Expand Down Expand Up @@ -591,7 +591,7 @@
"metadata": {},
"outputs": [],
"source": [
"s = df.ak.merge()"
"s = df.ak.pack()"
]
},
{
Expand Down
2 changes: 1 addition & 1 deletion example/cudf-ak.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@
" 'typestr',\n",
" 'typetracer',\n",
" 'unflatten',\n",
" 'unmerge',\n",
" 'unpack',\n",
" 'unzip',\n",
" 'validity_error',\n",
" 'values_astype',\n",
Expand Down
31 changes: 28 additions & 3 deletions src/akimbo/apply_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,22 @@
import pyarrow as pa


def match_any(*layout, **_):
return True


def leaf(*layout, **_):
"""True for the lowest elements of any akwward layout tree"""
return layout[0].is_leaf


def numeric(*layout, **_):
return layout[0].is_leaf and layout[0].parameters.get("__array__", None) not in {
"string",
"char",
}


def run_with_transform(
arr: ak.Array,
op,
Expand All @@ -24,6 +35,8 @@ def run_with_transform(
**kw,
) -> ak.Array:
def func(layout, **kwargs):
from akimbo.utils import match_string

if not isinstance(layout, tuple):
layout = (layout,)
if all(match(lay, **(match_kwargs or {})) for lay in layout):
Expand All @@ -34,11 +47,23 @@ def func(layout, **kwargs):
elif inmode == "numpy":
# works on numpy/cupy contents
out = op(*(lay.data for lay in layout), **kw, **(match_kwargs or {}))
else:
elif inmode == "ak":
out = op(*layout, **kw, **(match_kwargs or {}))
return outtype(out) if callable(outtype) else out
else:
out = op(
*(ak.Array(lay) for lay in layout), **kw, **(match_kwargs or {})
)
if callable(outtype):
return outtype(out)
elif isinstance(out, ak.Array):
return out.layout
else:
return out
if match_string(*layout):
# non-string op may fail to descend into string
return layout[0]

return ak.transform(func, arr, *others)
return ak.transform(func, arr, *others, allow_records=True)


def dec(
Expand Down
11 changes: 8 additions & 3 deletions src/akimbo/cudf.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def dec_cu(op, match=match_string):
def f(lay, **kwargs):
# op(column, ...)->column
col = op(lay._to_cudf(cudf, None, len(lay)), **kwargs)
return from_cudf(cudf.Series(col)).layout
return from_cudf(cudf.Series._from_column(col)).layout

return dec(func=f, match=match, inmode="ak")

Expand All @@ -61,7 +61,7 @@ def f(lay, method=meth, **kwargs):
# this is different from dec_cu, because we need to instantiate StringMethods
# before getting the method from it
col = getattr(
StringMethods(cudf.Series(lay._to_cudf(cudf, None, len(lay)))), method
StringMethods(cudf.Series._from_column(lay._to_cudf(cudf, None, len(lay)))), method
)(**kwargs)
return from_cudf(col).layout

Expand All @@ -87,7 +87,7 @@ def f(lay, method=meth, **kwargs):
else:
# attributes giving components
col = m
return from_cudf(cudf.Series(col)).layout
return from_cudf(cudf.Series._from_column(col)).layout

if isinstance(getattr(DatetimeColumn, meth), property):
setattr(
Expand All @@ -103,6 +103,11 @@ class CudfAwkwardAccessor(Accessor):
series_type = Series
dataframe_type = DataFrame

@classmethod
def _arrow_to_series(cls, data):
# this implies CPU->GPU copy
return Series(data)

@classmethod
def _to_output(cls, arr):
if isinstance(arr, ak.Array):
Expand Down
94 changes: 94 additions & 0 deletions src/akimbo/daft.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import awkward as ak
import daft
from daft.udf import StatelessUDF

from akimbo.apply_tree import run_with_transform
from akimbo.mixin import Accessor


class DaftAccessor(Accessor):
# this does not perform the operation, but makes a callable udf with
# the right annotation
def __getattr__(self, item):
func = getattr(ak, item)
return self._call_with_func(func)

def _call_with_func(self, func):
# todo: this should use run_with_transform

def closure(*args, **kwargs) -> callable:
not_series = [_ for _ in args if not isinstance(_, daft.DataFrame)]
other_series = [_ for _ in args if isinstance(_, daft.DataFrame)]

if not other_series:

def f(x: daft.Series) -> daft.Series:
# unary
arr = ak.from_arrow(x.to_arrow())
ak_arr = ak.Array({x.name(): func(arr, *not_series, **kwargs)})
if isinstance(ak_arr, ak.Array):
# like _to_output
return daft.Series.from_arrow(
ak.to_arrow(ak_arr, extensionarray=False)
)
return ak_arr

else:

def f(x: daft.Series, y: daft.Series) -> daft.Series:
# binary
arrx = ak.from_arrow(x.to_arrow())
arry = ak.from_arrow(y.to_arrow())

ak_arr = ak.Array(
{x.name(): func(arr_x, arr_y, *not_series, **kwargs)}
)
if isinstance(ak_arr, ak.Array):
# like _to_output
return daft.Series.from_arrow(
ak.to_arrow(ak_arr, extensionarray=False)
)
return ak_arr

schema = self._obj.schema().to_pyarrow_schema()
ak_schema = ak.from_arrow_schema(schema)
arr = ak.Array(ak_schema.length_zero_array()) # typetracer??
out = func(arr)
outtype = daft.DataType.from_arrow_type(
ak.to_arrow(out, extensionarray=False).type
)
udf = StatelessUDF(
name=func.__name__,
func=f,
return_dtype=outtype,
resource_request=None,
batch_size=None,
)
return udf

return closure

def __array_ufunc__(self, *args, **kwargs):
# ufuncs
if args[1] == "__call__":
return self._call_with_func(args[0])(self, **kwargs)
raise NotImplementedError

@classmethod
def _create_op(cls, op):
# dunder methods

def run(self, *args, **kwargs):
# closure =
return cls._call_with_func()

return run


@property # type:ignore
def ak_property(self):
return DaftAccessor(self)


daft.DataFrame.ak = ak_property
daft.Series.ak = ak_property
2 changes: 1 addition & 1 deletion src/akimbo/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def ak_to_series(ds, backend="pandas", extract=True):
else:
raise ValueError("Backend must be in {'pandas', 'polars', 'dask'}")
if extract and ds.fields:
return s.ak.unmerge()
return s.ak.unpack()
return s


Expand Down
Loading
Loading