Skip to content

Commit

Permalink
Add user defined window function support (#880)
Browse files Browse the repository at this point in the history
* Adding PyWindowUDF and implementing PartitionEvaluator for it. Still requires python side work.

* Add python wrappers for UDWF

* adding unit tests for user defined window functions

* Change udwf() to take an instance rather than a class so we can parameterize it

* Pass multiple arrays for udwf evaluate so we can capture the order_by and also multiple columns

* Update udwf to take multiple input columns

* Add user exampe for UDWF

* Update template for how values are passed to update

* Add user documentation for UDWF

* Updating documentation per PR review
  • Loading branch information
timsaucer authored Sep 30, 2024
1 parent f822495 commit 022e4b3
Show file tree
Hide file tree
Showing 9 changed files with 1,306 additions and 31 deletions.
194 changes: 176 additions & 18 deletions docs/source/user-guide/common-operations/udf-and-udfa.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,24 @@
.. specific language governing permissions and limitations
.. under the License.
User Defined Functions
User-Defined Functions
======================

DataFusion provides powerful expressions and functions, reducing the need for custom Python functions.
However you can still incorporate your own functions, i.e. User-Defined Functions (UDFs), with the :py:func:`~datafusion.udf.ScalarUDF.udf` function.
DataFusion provides powerful expressions and functions, reducing the need for custom Python
functions. However you can still incorporate your own functions, i.e. User-Defined Functions (UDFs).

Scalar Functions
----------------

When writing a user-defined function that can operate on a row by row basis, these are called Scalar
Functions. You can define your own scalar function by calling
:py:func:`~datafusion.udf.ScalarUDF.udf` .

The basic definition of a scalar UDF is a python function that takes one or more
`pyarrow <https://arrow.apache.org/docs/python/index.html>`_ arrays and returns a single array as
output. DataFusion scalar UDFs operate on an entire batch of records at a time, though the
evaluation of those records should be on a row by row basis. In the following example, we compute
if the input array contains null values.

.. ipython:: python
Expand All @@ -35,14 +48,67 @@ However you can still incorporate your own functions, i.e. User-Defined Function
ctx = datafusion.SessionContext()
batch = pyarrow.RecordBatch.from_arrays(
[pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
[pyarrow.array([1, None, 3]), pyarrow.array([4, 5, 6])],
names=["a", "b"],
)
df = ctx.create_dataframe([[batch]], name="batch_array")
df.select(is_null_arr(col("a"))).to_pandas()
df.select(col("a"), is_null_arr(col("a")).alias("is_null")).show()
In the previous example, we used the fact that pyarrow provides a variety of built in array
functions such as ``is_null()``. There are additional pyarrow
`compute functions <https://arrow.apache.org/docs/python/compute.html>`_ available. When possible,
it is highly recommended to use these functions because they can perform computations without doing
any copy operations from the original arrays. This leads to greatly improved performance.

If you need to perform an operation in python that is not available with the pyarrow compute
functions, you will need to convert the record batch into python values, perform your operation,
and construct an array. This operation of converting the built in data type of the array into a
python object can be one of the slowest operations in DataFusion, so it should be done sparingly.

The following example performs the same operation as before with ``is_null`` but demonstrates
converting to Python objects to do the evaluation.

.. ipython:: python
import pyarrow
import datafusion
from datafusion import udf, col
def is_null(array: pyarrow.Array) -> pyarrow.Array:
return pyarrow.array([value.as_py() is None for value in array])
is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), 'stable')
ctx = datafusion.SessionContext()
batch = pyarrow.RecordBatch.from_arrays(
[pyarrow.array([1, None, 3]), pyarrow.array([4, 5, 6])],
names=["a", "b"],
)
df = ctx.create_dataframe([[batch]], name="batch_array")
Additionally the :py:func:`~datafusion.udf.AggregateUDF.udaf` function allows you to define User-Defined Aggregate Functions (UDAFs)
df.select(col("a"), is_null_arr(col("a")).alias("is_null")).show()
Aggregate Functions
-------------------

The :py:func:`~datafusion.udf.AggregateUDF.udaf` function allows you to define User-Defined
Aggregate Functions (UDAFs). To use this you must implement an
:py:class:`~datafusion.udf.Accumulator` that determines how the aggregation is performed.

When defining a UDAF there are four methods you need to implement. The ``update`` function takes the
array(s) of input and updates the internal state of the accumulator. You should define this function
to have as many input arguments as you will pass when calling the UDAF. Since aggregation may be
split into multiple batches, we must have a method to combine multiple batches. For this, we have
two functions, ``state`` and ``merge``. ``state`` will return an array of scalar values that contain
the current state of a single batch accumulation. Then we must ``merge`` the results of these
different states. Finally ``evaluate`` is the call that will return the final result after the
``merge`` is complete.

In the following example we want to define a custom aggregate function that will return the
difference between the sum of two columns. The state can be represented by a single value and we can
also see how the inputs to ``update`` and ``merge`` differ.

.. code-block:: python
Expand All @@ -57,30 +123,122 @@ Additionally the :py:func:`~datafusion.udf.AggregateUDF.udaf` function allows yo
Interface of a user-defined accumulation.
"""
def __init__(self):
self._sum = pyarrow.scalar(0.0)
self._sum = 0.0
def update(self, values: pyarrow.Array) -> None:
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py())
def update(self, values_a: pyarrow.Array, values_b: pyarrow.Array) -> None:
self._sum = self._sum + pyarrow.compute.sum(values_a).as_py() - pyarrow.compute.sum(values_b).as_py()
def merge(self, states: List[pyarrow.Array]) -> None:
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states[0]).as_py())
self._sum = self._sum + pyarrow.compute.sum(states[0]).as_py()
def state(self) -> pyarrow.Array:
return pyarrow.array([self._sum.as_py()])
return pyarrow.array([self._sum])
def evaluate(self) -> pyarrow.Scalar:
return self._sum
return pyarrow.scalar(self._sum)
ctx = datafusion.SessionContext()
df = ctx.from_pydict(
{
"a": [1, 2, 3],
"b": [4, 5, 6],
"a": [4, 5, 6],
"b": [1, 2, 3],
}
)
my_udaf = udaf(MyAccumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()], 'stable')
my_udaf = udaf(MyAccumulator, [pyarrow.float64(), pyarrow.float64()], pyarrow.float64(), [pyarrow.float64()], 'stable')
df.aggregate([], [my_udaf(col("a"), col("b")).alias("col_diff")])
Window Functions
----------------

To implement a User-Defined Window Function (UDWF) you must call the
:py:func:`~datafusion.udf.WindowUDF.udwf` function using a class that implements the abstract
class :py:class:`~datafusion.udf.WindowEvaluator`.

There are three methods of evaluation of UDWFs.

- ``evaluate`` is the simplest case, where you are given an array and are expected to calculate the
value for a single row of that array. This is the simplest case, but also the least performant.
- ``evaluate_all`` computes the values for all rows for an input array at a single time.
- ``evaluate_all_with_rank`` computes the values for all rows, but you only have the rank
information for the rows.

Which methods you implement are based upon which of these options are set.

.. list-table::
:header-rows: 1

* - ``uses_window_frame``
- ``supports_bounded_execution``
- ``include_rank``
- function_to_implement
* - False (default)
- False (default)
- False (default)
- ``evaluate_all``
* - False
- True
- False
- ``evaluate``
* - False
- True
- False
- ``evaluate_all_with_rank``
* - True
- True/False
- True/False
- ``evaluate``

UDWF options
^^^^^^^^^^^^

When you define your UDWF you can override the functions that return these values. They will
determine which evaluate functions are called.

- ``uses_window_frame`` is set for functions that compute based on the specified window frame. If
your function depends upon the specified frame, set this to ``True``.
- ``supports_bounded_execution`` specifies if your function can be incrementally computed.
- ``include_rank`` is set to ``True`` for window functions that can be computed only using the rank
information.


.. code-block:: python
import pyarrow as pa
from datafusion import udwf, col, SessionContext
from datafusion.udf import WindowEvaluator
class ExponentialSmooth(WindowEvaluator):
def __init__(self, alpha: float) -> None:
self.alpha = alpha
def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array:
results = []
curr_value = 0.0
values = values[0]
for idx in range(num_rows):
if idx == 0:
curr_value = values[idx].as_py()
else:
curr_value = values[idx].as_py() * self.alpha + curr_value * (
1.0 - self.alpha
)
results.append(curr_value)
return pa.array(results)
exp_smooth = udwf(
ExponentialSmooth(0.9),
pa.float64(),
pa.float64(),
volatility="immutable",
)
ctx = SessionContext()
df = ctx.from_pydict({
"a": [1.0, 2.1, 2.9, 4.0, 5.1, 6.0, 6.9, 8.0]
})
df.aggregate([],[my_udaf(col("a"))])
df.select("a", exp_smooth(col("a")).alias("smooth_a")).show()
Loading

0 comments on commit 022e4b3

Please sign in to comment.