diff --git a/docs/source/user-guide/common-operations/udf-and-udfa.rst b/docs/source/user-guide/common-operations/udf-and-udfa.rst index e9c142f0..ffd7a05c 100644 --- a/docs/source/user-guide/common-operations/udf-and-udfa.rst +++ b/docs/source/user-guide/common-operations/udf-and-udfa.rst @@ -15,11 +15,24 @@ .. specific language governing permissions and limitations .. under the License. -User Defined Functions +User-Defined Functions ====================== -DataFusion provides powerful expressions and functions, reducing the need for custom Python functions. -However you can still incorporate your own functions, i.e. User-Defined Functions (UDFs), with the :py:func:`~datafusion.udf.ScalarUDF.udf` function. +DataFusion provides powerful expressions and functions, reducing the need for custom Python +functions. However you can still incorporate your own functions, i.e. User-Defined Functions (UDFs). + +Scalar Functions +---------------- + +When writing a user-defined function that can operate on a row by row basis, these are called Scalar +Functions. You can define your own scalar function by calling +:py:func:`~datafusion.udf.ScalarUDF.udf` . + +The basic definition of a scalar UDF is a python function that takes one or more +`pyarrow `_ arrays and returns a single array as +output. DataFusion scalar UDFs operate on an entire batch of records at a time, though the +evaluation of those records should be on a row by row basis. In the following example, we compute +if the input array contains null values. .. ipython:: python @@ -35,14 +48,67 @@ However you can still incorporate your own functions, i.e. User-Defined Function ctx = datafusion.SessionContext() batch = pyarrow.RecordBatch.from_arrays( - [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])], + [pyarrow.array([1, None, 3]), pyarrow.array([4, 5, 6])], names=["a", "b"], ) df = ctx.create_dataframe([[batch]], name="batch_array") - df.select(is_null_arr(col("a"))).to_pandas() + df.select(col("a"), is_null_arr(col("a")).alias("is_null")).show() + +In the previous example, we used the fact that pyarrow provides a variety of built in array +functions such as ``is_null()``. There are additional pyarrow +`compute functions `_ available. When possible, +it is highly recommended to use these functions because they can perform computations without doing +any copy operations from the original arrays. This leads to greatly improved performance. + +If you need to perform an operation in python that is not available with the pyarrow compute +functions, you will need to convert the record batch into python values, perform your operation, +and construct an array. This operation of converting the built in data type of the array into a +python object can be one of the slowest operations in DataFusion, so it should be done sparingly. + +The following example performs the same operation as before with ``is_null`` but demonstrates +converting to Python objects to do the evaluation. + +.. ipython:: python + + import pyarrow + import datafusion + from datafusion import udf, col + + def is_null(array: pyarrow.Array) -> pyarrow.Array: + return pyarrow.array([value.as_py() is None for value in array]) + + is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), 'stable') + + ctx = datafusion.SessionContext() + + batch = pyarrow.RecordBatch.from_arrays( + [pyarrow.array([1, None, 3]), pyarrow.array([4, 5, 6])], + names=["a", "b"], + ) + df = ctx.create_dataframe([[batch]], name="batch_array") -Additionally the :py:func:`~datafusion.udf.AggregateUDF.udaf` function allows you to define User-Defined Aggregate Functions (UDAFs) + df.select(col("a"), is_null_arr(col("a")).alias("is_null")).show() + +Aggregate Functions +------------------- + +The :py:func:`~datafusion.udf.AggregateUDF.udaf` function allows you to define User-Defined +Aggregate Functions (UDAFs). To use this you must implement an +:py:class:`~datafusion.udf.Accumulator` that determines how the aggregation is performed. + +When defining a UDAF there are four methods you need to implement. The ``update`` function takes the +array(s) of input and updates the internal state of the accumulator. You should define this function +to have as many input arguments as you will pass when calling the UDAF. Since aggregation may be +split into multiple batches, we must have a method to combine multiple batches. For this, we have +two functions, ``state`` and ``merge``. ``state`` will return an array of scalar values that contain +the current state of a single batch accumulation. Then we must ``merge`` the results of these +different states. Finally ``evaluate`` is the call that will return the final result after the +``merge`` is complete. + +In the following example we want to define a custom aggregate function that will return the +difference between the sum of two columns. The state can be represented by a single value and we can +also see how the inputs to ``update`` and ``merge`` differ. .. code-block:: python @@ -57,30 +123,122 @@ Additionally the :py:func:`~datafusion.udf.AggregateUDF.udaf` function allows yo Interface of a user-defined accumulation. """ def __init__(self): - self._sum = pyarrow.scalar(0.0) + self._sum = 0.0 - def update(self, values: pyarrow.Array) -> None: - # not nice since pyarrow scalars can't be summed yet. This breaks on `None` - self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py()) + def update(self, values_a: pyarrow.Array, values_b: pyarrow.Array) -> None: + self._sum = self._sum + pyarrow.compute.sum(values_a).as_py() - pyarrow.compute.sum(values_b).as_py() def merge(self, states: List[pyarrow.Array]) -> None: - # not nice since pyarrow scalars can't be summed yet. This breaks on `None` - self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states[0]).as_py()) + self._sum = self._sum + pyarrow.compute.sum(states[0]).as_py() def state(self) -> pyarrow.Array: - return pyarrow.array([self._sum.as_py()]) + return pyarrow.array([self._sum]) def evaluate(self) -> pyarrow.Scalar: - return self._sum + return pyarrow.scalar(self._sum) ctx = datafusion.SessionContext() df = ctx.from_pydict( { - "a": [1, 2, 3], - "b": [4, 5, 6], + "a": [4, 5, 6], + "b": [1, 2, 3], } ) - my_udaf = udaf(MyAccumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()], 'stable') + my_udaf = udaf(MyAccumulator, [pyarrow.float64(), pyarrow.float64()], pyarrow.float64(), [pyarrow.float64()], 'stable') + + df.aggregate([], [my_udaf(col("a"), col("b")).alias("col_diff")]) + +Window Functions +---------------- + +To implement a User-Defined Window Function (UDWF) you must call the +:py:func:`~datafusion.udf.WindowUDF.udwf` function using a class that implements the abstract +class :py:class:`~datafusion.udf.WindowEvaluator`. + +There are three methods of evaluation of UDWFs. + +- ``evaluate`` is the simplest case, where you are given an array and are expected to calculate the + value for a single row of that array. This is the simplest case, but also the least performant. +- ``evaluate_all`` computes the values for all rows for an input array at a single time. +- ``evaluate_all_with_rank`` computes the values for all rows, but you only have the rank + information for the rows. + +Which methods you implement are based upon which of these options are set. + +.. list-table:: + :header-rows: 1 + + * - ``uses_window_frame`` + - ``supports_bounded_execution`` + - ``include_rank`` + - function_to_implement + * - False (default) + - False (default) + - False (default) + - ``evaluate_all`` + * - False + - True + - False + - ``evaluate`` + * - False + - True + - False + - ``evaluate_all_with_rank`` + * - True + - True/False + - True/False + - ``evaluate`` + +UDWF options +^^^^^^^^^^^^ + +When you define your UDWF you can override the functions that return these values. They will +determine which evaluate functions are called. + +- ``uses_window_frame`` is set for functions that compute based on the specified window frame. If + your function depends upon the specified frame, set this to ``True``. +- ``supports_bounded_execution`` specifies if your function can be incrementally computed. +- ``include_rank`` is set to ``True`` for window functions that can be computed only using the rank + information. + + +.. code-block:: python + + import pyarrow as pa + from datafusion import udwf, col, SessionContext + from datafusion.udf import WindowEvaluator + + class ExponentialSmooth(WindowEvaluator): + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array: + results = [] + curr_value = 0.0 + values = values[0] + for idx in range(num_rows): + if idx == 0: + curr_value = values[idx].as_py() + else: + curr_value = values[idx].as_py() * self.alpha + curr_value * ( + 1.0 - self.alpha + ) + results.append(curr_value) + + return pa.array(results) + + exp_smooth = udwf( + ExponentialSmooth(0.9), + pa.float64(), + pa.float64(), + volatility="immutable", + ) + + ctx = SessionContext() + + df = ctx.from_pydict({ + "a": [1.0, 2.1, 2.9, 4.0, 5.1, 6.0, 6.9, 8.0] + }) - df.aggregate([],[my_udaf(col("a"))]) + df.select("a", exp_smooth(col("a")).alias("smooth_a")).show() diff --git a/examples/python-udwf.py b/examples/python-udwf.py new file mode 100644 index 00000000..05b3021d --- /dev/null +++ b/examples/python-udwf.py @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyarrow as pa +import datafusion +from datafusion import udwf, functions as f, col, lit +from datafusion.udf import WindowEvaluator +from datafusion.expr import WindowFrame + +# This example creates five different examples of user defined window functions in order +# to demonstrate the variety of ways a user may need to implement. + + +class ExponentialSmoothDefault(WindowEvaluator): + """Create a running smooth operation across an entire partition at once.""" + + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array: + results = [] + curr_value = 0.0 + values = values[0] + for idx in range(num_rows): + if idx == 0: + curr_value = values[idx].as_py() + else: + curr_value = values[idx].as_py() * self.alpha + curr_value * ( + 1.0 - self.alpha + ) + results.append(curr_value) + + return pa.array(results) + + +class SmoothBoundedFromPreviousRow(WindowEvaluator): + """Smooth over from the previous to current row only.""" + + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def supports_bounded_execution(self) -> bool: + return True + + def get_range(self, idx: int, num_rows: int) -> tuple[int, int]: + # Override the default range of current row since uses_window_frame is False + # So for the purpose of this test we just smooth from the previous row to + # current. + if idx == 0: + return (0, 0) + return (idx - 1, idx) + + def evaluate( + self, values: list[pa.Array], eval_range: tuple[int, int] + ) -> pa.Scalar: + (start, stop) = eval_range + curr_value = 0.0 + values = values[0] + for idx in range(start, stop + 1): + if idx == start: + curr_value = values[idx].as_py() + else: + curr_value = values[idx].as_py() * self.alpha + curr_value * ( + 1.0 - self.alpha + ) + return pa.scalar(curr_value).cast(pa.float64()) + + +class SmoothAcrossRank(WindowEvaluator): + """Smooth over the rank from the previous rank to current.""" + + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def include_rank(self) -> bool: + return True + + def evaluate_all_with_rank( + self, num_rows: int, ranks_in_partition: list[tuple[int, int]] + ) -> pa.Array: + results = [] + for idx in range(num_rows): + if idx == 0: + prior_value = 1.0 + matching_row = [ + i + for i in range(len(ranks_in_partition)) + if ranks_in_partition[i][0] <= idx and ranks_in_partition[i][1] > idx + ][0] + 1 + curr_value = matching_row * self.alpha + prior_value * (1.0 - self.alpha) + results.append(curr_value) + prior_value = matching_row + + return pa.array(results) + + +class ExponentialSmoothFrame(WindowEvaluator): + "Find the value across an entire frame using exponential smoothing" + + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def uses_window_frame(self) -> bool: + return True + + def evaluate( + self, values: list[pa.Array], eval_range: tuple[int, int] + ) -> pa.Scalar: + (start, stop) = eval_range + curr_value = 0.0 + if len(values) > 1: + order_by = values[1] # noqa: F841 + values = values[0] + else: + values = values[0] + for idx in range(start, stop): + if idx == start: + curr_value = values[idx].as_py() + else: + curr_value = values[idx].as_py() * self.alpha + curr_value * ( + 1.0 - self.alpha + ) + return pa.scalar(curr_value).cast(pa.float64()) + + +class SmoothTwoColumn(WindowEvaluator): + """Smooth once column based on a condition of another column. + + If the second column is above a threshold, then smooth over the first column from + the previous and next rows. + """ + + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array: + results = [] + values_a = values[0] + values_b = values[1] + for idx in range(num_rows): + if not values_b[idx].is_valid: + if idx == 0: + results.append(values_a[1].cast(pa.float64())) + elif idx == num_rows - 1: + results.append(values_a[num_rows - 2].cast(pa.float64())) + else: + results.append( + pa.scalar( + values_a[idx - 1].as_py() * self.alpha + + values_a[idx + 1].as_py() * (1.0 - self.alpha) + ) + ) + else: + results.append(values_a[idx].cast(pa.float64())) + + return pa.array(results) + + +# create a context +ctx = datafusion.SessionContext() + +# create a RecordBatch and a new DataFrame from it +batch = pa.RecordBatch.from_arrays( + [ + pa.array([1.0, 2.1, 2.9, 4.0, 5.1, 6.0, 6.9, 8.0]), + pa.array([1, 2, None, 4, 5, 6, None, 8]), + pa.array(["A", "A", "A", "A", "A", "B", "B", "B"]), + ], + names=["a", "b", "c"], +) +df = ctx.create_dataframe([[batch]]) + +exp_smooth = udwf( + ExponentialSmoothDefault(0.9), + pa.float64(), + pa.float64(), + volatility="immutable", +) + +smooth_two_row = udwf( + SmoothBoundedFromPreviousRow(0.9), + pa.float64(), + pa.float64(), + volatility="immutable", +) + +smooth_rank = udwf( + SmoothAcrossRank(0.9), + pa.float64(), + pa.float64(), + volatility="immutable", +) + +smooth_frame = udwf( + ExponentialSmoothFrame(0.9), + pa.float64(), + pa.float64(), + volatility="immutable", +) + +smooth_two_col = udwf( + SmoothTwoColumn(0.9), + [pa.float64(), pa.int64()], + pa.float64(), + volatility="immutable", +) + +# These are done with separate statements instead of one large `select` because that will +# attempt to combine the window operations and our defined UDFs do not all support that. +( + df.with_column("exp_smooth", exp_smooth(col("a"))) + .with_column("smooth_prior_row", smooth_two_row(col("a"))) + .with_column("smooth_rank", smooth_rank(col("a")).order_by(col("c")).build()) + .with_column("smooth_two_col", smooth_two_col(col("a"), col("b"))) + .with_column( + "smooth_frame", + smooth_frame(col("a")).window_frame(WindowFrame("rows", None, 0)).build(), + ) + .select( + "a", + "b", + "c", + "exp_smooth", + "smooth_prior_row", + "smooth_rank", + "smooth_two_col", + "smooth_frame", + ) +).show() + +assert df.select(f.round(exp_smooth(col("a")), lit(3))).collect()[0].column( + 0 +) == pa.array([1, 1.99, 2.809, 3.881, 4.978, 5.898, 6.8, 7.88]) + + +assert df.select(f.round(smooth_two_row(col("a")), lit(3))).collect()[0].column( + 0 +) == pa.array([1.0, 1.99, 2.82, 3.89, 4.99, 5.91, 6.81, 7.89]) + + +assert df.select(smooth_rank(col("a")).order_by(col("c")).build()).collect()[0].column( + 0 +) == pa.array([1, 1, 1, 1, 1, 1.9, 2.0, 2.0]) + + +assert df.select(smooth_two_col(col("a"), col("b"))).collect()[0].column(0) == pa.array( + [1, 2.1, 2.29, 4, 5.1, 6, 6.2, 8.0] +) + + +assert df.select( + f.round( + smooth_frame(col("a")).window_frame(WindowFrame("rows", None, 0)).build(), + lit(3), + ) +).collect()[0].column(0) == pa.array([1, 1.99, 2.809, 3.881, 4.978, 5.898, 6.8, 7.88]) diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py index 08ca3fe0..4f40b208 100644 --- a/python/datafusion/__init__.py +++ b/python/datafusion/__init__.py @@ -40,7 +40,7 @@ from .record_batch import RecordBatchStream, RecordBatch -from .udf import ScalarUDF, AggregateUDF, Accumulator +from .udf import ScalarUDF, AggregateUDF, Accumulator, WindowUDF from .common import ( DFSchema, @@ -78,6 +78,7 @@ "Database", "Table", "AggregateUDF", + "WindowUDF", "LogicalPlan", "ExecutionPlan", "RecordBatch", @@ -113,3 +114,5 @@ def lit(value): udf = ScalarUDF.udf udaf = AggregateUDF.udaf + +udwf = WindowUDF.udwf diff --git a/python/datafusion/context.py b/python/datafusion/context.py index 35a40ccd..59d053e1 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -25,12 +25,11 @@ from ._internal import SessionContext as SessionContextInternal from ._internal import LogicalPlan, ExecutionPlan -from datafusion._internal import AggregateUDF from datafusion.catalog import Catalog, Table from datafusion.dataframe import DataFrame from datafusion.expr import Expr, SortExpr, sort_list_to_raw_sort_list from datafusion.record_batch import RecordBatchStream -from datafusion.udf import ScalarUDF +from datafusion.udf import ScalarUDF, AggregateUDF, WindowUDF from typing import Any, TYPE_CHECKING from typing_extensions import deprecated @@ -833,6 +832,10 @@ def register_udaf(self, udaf: AggregateUDF) -> None: """Register a user-defined aggregation function (UDAF) with the context.""" self.ctx.register_udaf(udaf._udaf) + def register_udwf(self, udwf: WindowUDF) -> None: + """Register a user-defined window function (UDWF) with the context.""" + self.ctx.register_udwf(udwf._udwf) + def catalog(self, name: str = "datafusion") -> Catalog: """Retrieve a catalog by name.""" return self.ctx.catalog(name) diff --git a/python/datafusion/tests/test_udwf.py b/python/datafusion/tests/test_udwf.py new file mode 100644 index 00000000..67c0979f --- /dev/null +++ b/python/datafusion/tests/test_udwf.py @@ -0,0 +1,294 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyarrow as pa +import pytest + +from datafusion import SessionContext, column, udwf, lit, functions as f +from datafusion.udf import WindowEvaluator +from datafusion.expr import WindowFrame + + +class ExponentialSmoothDefault(WindowEvaluator): + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array: + results = [] + curr_value = 0.0 + values = values[0] + for idx in range(num_rows): + if idx == 0: + curr_value = values[idx].as_py() + else: + curr_value = values[idx].as_py() * self.alpha + curr_value * ( + 1.0 - self.alpha + ) + results.append(curr_value) + + return pa.array(results) + + +class ExponentialSmoothBounded(WindowEvaluator): + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def supports_bounded_execution(self) -> bool: + return True + + def get_range(self, idx: int, num_rows: int) -> tuple[int, int]: + # Override the default range of current row since uses_window_frame is False + # So for the purpose of this test we just smooth from the previous row to + # current. + if idx == 0: + return (0, 0) + return (idx - 1, idx) + + def evaluate( + self, values: list[pa.Array], eval_range: tuple[int, int] + ) -> pa.Scalar: + (start, stop) = eval_range + curr_value = 0.0 + values = values[0] + for idx in range(start, stop + 1): + if idx == start: + curr_value = values[idx].as_py() + else: + curr_value = values[idx].as_py() * self.alpha + curr_value * ( + 1.0 - self.alpha + ) + return pa.scalar(curr_value).cast(pa.float64()) + + +class ExponentialSmoothRank(WindowEvaluator): + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def include_rank(self) -> bool: + return True + + def evaluate_all_with_rank( + self, num_rows: int, ranks_in_partition: list[tuple[int, int]] + ) -> pa.Array: + results = [] + for idx in range(num_rows): + if idx == 0: + prior_value = 1.0 + matching_row = [ + i + for i in range(len(ranks_in_partition)) + if ranks_in_partition[i][0] <= idx and ranks_in_partition[i][1] > idx + ][0] + 1 + curr_value = matching_row * self.alpha + prior_value * (1.0 - self.alpha) + results.append(curr_value) + prior_value = matching_row + + return pa.array(results) + + +class ExponentialSmoothFrame(WindowEvaluator): + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def uses_window_frame(self) -> bool: + return True + + def evaluate( + self, values: list[pa.Array], eval_range: tuple[int, int] + ) -> pa.Scalar: + (start, stop) = eval_range + curr_value = 0.0 + if len(values) > 1: + order_by = values[1] # noqa: F841 + values = values[0] + else: + values = values[0] + for idx in range(start, stop): + if idx == start: + curr_value = values[idx].as_py() + else: + curr_value = values[idx].as_py() * self.alpha + curr_value * ( + 1.0 - self.alpha + ) + return pa.scalar(curr_value).cast(pa.float64()) + + +class SmoothTwoColumn(WindowEvaluator): + """This class demonstrates using two columns. + + If the second column is above a threshold, then smooth over the first column from + the previous and next rows. + """ + + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array: + results = [] + values_a = values[0] + values_b = values[1] + for idx in range(num_rows): + if values_b[idx].as_py() > 7: + if idx == 0: + results.append(values_a[1].cast(pa.float64())) + elif idx == num_rows - 1: + results.append(values_a[num_rows - 2].cast(pa.float64())) + else: + results.append( + pa.scalar( + values_a[idx - 1].as_py() * self.alpha + + values_a[idx + 1].as_py() * (1.0 - self.alpha) + ) + ) + else: + results.append(values_a[idx].cast(pa.float64())) + + return pa.array(results) + + +class NotSubclassOfWindowEvaluator: + pass + + +@pytest.fixture +def df(): + ctx = SessionContext() + + # create a RecordBatch and a new DataFrame from it + batch = pa.RecordBatch.from_arrays( + [ + pa.array([0, 1, 2, 3, 4, 5, 6]), + pa.array([7, 4, 3, 8, 9, 1, 6]), + pa.array(["A", "A", "A", "A", "B", "B", "B"]), + ], + names=["a", "b", "c"], + ) + return ctx.create_dataframe([[batch]]) + + +def test_udwf_errors(df): + with pytest.raises(TypeError): + udwf( + NotSubclassOfWindowEvaluator(), + pa.float64(), + pa.float64(), + volatility="immutable", + ) + + +smooth_default = udwf( + ExponentialSmoothDefault(0.9), + pa.float64(), + pa.float64(), + volatility="immutable", +) + +smooth_bounded = udwf( + ExponentialSmoothBounded(0.9), + pa.float64(), + pa.float64(), + volatility="immutable", +) + +smooth_rank = udwf( + ExponentialSmoothRank(0.9), + pa.utf8(), + pa.float64(), + volatility="immutable", +) + +smooth_frame = udwf( + ExponentialSmoothFrame(0.9), + pa.float64(), + pa.float64(), + volatility="immutable", +) + +smooth_two_col = udwf( + SmoothTwoColumn(0.9), + [pa.int64(), pa.int64()], + pa.float64(), + volatility="immutable", +) + +data_test_udwf_functions = [ + ( + "default_udwf", + smooth_default(column("a")), + [0, 0.9, 1.89, 2.889, 3.889, 4.889, 5.889], + ), + ( + "default_udwf_partitioned", + smooth_default(column("a")).partition_by(column("c")).build(), + [0, 0.9, 1.89, 2.889, 4.0, 4.9, 5.89], + ), + ( + "default_udwf_ordered", + smooth_default(column("a")).order_by(column("b")).build(), + [0.551, 1.13, 2.3, 2.755, 3.876, 5.0, 5.513], + ), + ( + "bounded_udwf", + smooth_bounded(column("a")), + [0, 0.9, 1.9, 2.9, 3.9, 4.9, 5.9], + ), + ( + "bounded_udwf_ignores_frame", + smooth_bounded(column("a")) + .window_frame(WindowFrame("rows", None, None)) + .build(), + [0, 0.9, 1.9, 2.9, 3.9, 4.9, 5.9], + ), + ( + "rank_udwf", + smooth_rank(column("c")).order_by(column("c")).build(), + [1, 1, 1, 1, 1.9, 2, 2], + ), + ( + "frame_unbounded_udwf", + smooth_frame(column("a")).window_frame(WindowFrame("rows", None, None)).build(), + [5.889, 5.889, 5.889, 5.889, 5.889, 5.889, 5.889], + ), + ( + "frame_bounded_udwf", + smooth_frame(column("a")).window_frame(WindowFrame("rows", None, 0)).build(), + [0.0, 0.9, 1.89, 2.889, 3.889, 4.889, 5.889], + ), + ( + "frame_bounded_udwf", + smooth_frame(column("a")) + .window_frame(WindowFrame("rows", None, 0)) + .order_by(column("b")) + .build(), + [0.551, 1.13, 2.3, 2.755, 3.876, 5.0, 5.513], + ), + ( + "two_column_udwf", + smooth_two_col(column("a"), column("b")), + [0.0, 1.0, 2.0, 2.2, 3.2, 5.0, 6.0], + ), +] + + +@pytest.mark.parametrize("name,expr,expected", data_test_udwf_functions) +def test_udwf_functions(df, name, expr, expected): + df = df.select("a", "b", f.round(expr, lit(3)).alias(name)) + + # execute and collect the first (and only) batch + result = df.sort(column("a")).select(column(name)).collect()[0] + + assert result.column(0) == pa.array(expected) diff --git a/python/datafusion/udf.py b/python/datafusion/udf.py index f74d675e..bb7a9086 100644 --- a/python/datafusion/udf.py +++ b/python/datafusion/udf.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -"""Provides the user defined functions for evaluation of dataframes.""" +"""Provides the user-defined functions for evaluation of dataframes.""" from __future__ import annotations @@ -76,7 +76,7 @@ def __str__(self): class ScalarUDF: - """Class for performing scalar user defined functions (UDF). + """Class for performing scalar user-defined functions (UDF). Scalar UDFs operate on a row by row basis. See also :py:class:`AggregateUDF` for operating on a group of rows. @@ -90,7 +90,7 @@ def __init__( return_type: _R, volatility: Volatility | str, ) -> None: - """Instantiate a scalar user defined function (UDF). + """Instantiate a scalar user-defined function (UDF). See helper method :py:func:`udf` for argument details. """ @@ -115,7 +115,7 @@ def udf( volatility: Volatility | str, name: str | None = None, ) -> ScalarUDF: - """Create a new User Defined Function. + """Create a new User-Defined Function. Args: func: A callable python function. @@ -127,7 +127,7 @@ def udf( name: A descriptive name for the function. Returns: - A user defined aggregate function, which can be used in either data + A user-defined aggregate function, which can be used in either data aggregation or window function calls. """ if not callable(func): @@ -152,7 +152,7 @@ def state(self) -> List[pyarrow.Scalar]: pass @abstractmethod - def update(self, values: pyarrow.Array) -> None: + def update(self, *values: pyarrow.Array) -> None: """Evaluate an array of values and update state.""" pass @@ -172,7 +172,7 @@ def evaluate(self) -> pyarrow.Scalar: class AggregateUDF: - """Class for performing scalar user defined functions (UDF). + """Class for performing scalar user-defined functions (UDF). Aggregate UDFs operate on a group of rows and return a single value. See also :py:class:`ScalarUDF` for operating on a row by row basis. @@ -187,7 +187,7 @@ def __init__( state_type: list[pyarrow.DataType], volatility: Volatility | str, ) -> None: - """Instantiate a user defined aggregate function (UDAF). + """Instantiate a user-defined aggregate function (UDAF). See :py:func:`udaf` for a convenience function and argument descriptions. @@ -214,7 +214,7 @@ def udaf( volatility: Volatility | str, name: str | None = None, ) -> AggregateUDF: - """Create a new User Defined Aggregate Function. + """Create a new User-Defined Aggregate Function. The accumulator function must be callable and implement :py:class:`Accumulator`. @@ -227,7 +227,7 @@ def udaf( name: A descriptive name for the function. Returns: - A user defined aggregate function, which can be used in either data + A user-defined aggregate function, which can be used in either data aggregation or window function calls. """ if not issubclass(accum, Accumulator): @@ -246,3 +246,237 @@ def udaf( state_type=state_type, volatility=volatility, ) + + +class WindowEvaluator(metaclass=ABCMeta): + """Evaluator class for user-defined window functions (UDWF). + + It is up to the user to decide which evaluate function is appropriate. + + +------------------------+--------------------------------+------------------+---------------------------+ + | ``uses_window_frame`` | ``supports_bounded_execution`` | ``include_rank`` | function_to_implement | + +========================+================================+==================+===========================+ + | False (default) | False (default) | False (default) | ``evaluate_all`` | + +------------------------+--------------------------------+------------------+---------------------------+ + | False | True | False | ``evaluate`` | + +------------------------+--------------------------------+------------------+---------------------------+ + | False | True/False | True | ``evaluate_all_with_rank``| + +------------------------+--------------------------------+------------------+---------------------------+ + | True | True/False | True/False | ``evaluate`` | + +------------------------+--------------------------------+------------------+---------------------------+ + """ # noqa: W505 + + def memoize(self) -> None: + """Perform a memoize operation to improve performance. + + When the window frame has a fixed beginning (e.g UNBOUNDED + PRECEDING), some functions such as FIRST_VALUE and + NTH_VALUE do not need the (unbounded) input once they have + seen a certain amount of input. + + `memoize` is called after each input batch is processed, and + such functions can save whatever they need + """ + pass + + def get_range(self, idx: int, num_rows: int) -> tuple[int, int]: + """Return the range for the window fuction. + + If `uses_window_frame` flag is `false`. This method is used to + calculate required range for the window function during + stateful execution. + + Generally there is no required range, hence by default this + returns smallest range(current row). e.g seeing current row is + enough to calculate window result (such as row_number, rank, + etc) + + Args: + idx:: Current index + num_rows: Number of rows. + """ + return (idx, idx + 1) + + def is_causal(self) -> bool: + """Get whether evaluator needs future data for its result.""" + return False + + def evaluate_all(self, values: list[pyarrow.Array], num_rows: int) -> pyarrow.Array: + """Evaluate a window function on an entire input partition. + + This function is called once per input *partition* for window functions that + *do not use* values from the window frame, such as + :py:func:`~datafusion.functions.row_number`, :py:func:`~datafusion.functions.rank`, + :py:func:`~datafusion.functions.dense_rank`, :py:func:`~datafusion.functions.percent_rank`, + :py:func:`~datafusion.functions.cume_dist`, :py:func:`~datafusion.functions.lead`, + and :py:func:`~datafusion.functions.lag`. + + It produces the result of all rows in a single pass. It + expects to receive the entire partition as the ``value`` and + must produce an output column with one output row for every + input row. + + ``num_rows`` is required to correctly compute the output in case + ``len(values) == 0`` + + Implementing this function is an optimization. Certain window + functions are not affected by the window frame definition or + the query doesn't have a frame, and ``evaluate`` skips the + (costly) window frame boundary calculation and the overhead of + calling ``evaluate`` for each output row. + + For example, the `LAG` built in window function does not use + the values of its window frame (it can be computed in one shot + on the entire partition with ``Self::evaluate_all`` regardless of the + window defined in the ``OVER`` clause) + + .. code-block:: text + + lag(x, 1) OVER (ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) + + However, ``avg()`` computes the average in the window and thus + does use its window frame. + + .. code-block:: text + + avg(x) OVER (PARTITION BY y ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) + """ # noqa: W505 + pass + + def evaluate( + self, values: list[pyarrow.Array], eval_range: tuple[int, int] + ) -> pyarrow.Scalar: + """Evaluate window function on a range of rows in an input partition. + + This is the simplest and most general function to implement + but also the least performant as it creates output one row at + a time. It is typically much faster to implement stateful + evaluation using one of the other specialized methods on this + trait. + + Returns a [`ScalarValue`] that is the value of the window + function within `range` for the entire partition. Argument + `values` contains the evaluation result of function arguments + and evaluation results of ORDER BY expressions. If function has a + single argument, `values[1..]` will contain ORDER BY expression results. + """ + pass + + def evaluate_all_with_rank( + self, num_rows: int, ranks_in_partition: list[tuple[int, int]] + ) -> pyarrow.Array: + """Called for window functions that only need the rank of a row. + + Evaluate the partition evaluator against the partition using + the row ranks. For example, ``rank(col("a"))`` produces + + .. code-block:: text + + a | rank + - + ---- + A | 1 + A | 1 + C | 3 + D | 4 + D | 4 + + For this case, `num_rows` would be `5` and the + `ranks_in_partition` would be called with + + .. code-block:: text + + [ + (0,1), + (2,2), + (3,4), + ] + + The user must implement this method if ``include_rank`` returns True. + """ + pass + + def supports_bounded_execution(self) -> bool: + """Can the window function be incrementally computed using bounded memory?""" + return False + + def uses_window_frame(self) -> bool: + """Does the window function use the values from the window frame?""" + return False + + def include_rank(self) -> bool: + """Can this function be evaluated with (only) rank?""" + return False + + +if TYPE_CHECKING: + _W = TypeVar("_W", bound=WindowEvaluator) + + +class WindowUDF: + """Class for performing window user-defined functions (UDF). + + Window UDFs operate on a partition of rows. See + also :py:class:`ScalarUDF` for operating on a row by row basis. + """ + + def __init__( + self, + name: str | None, + func: WindowEvaluator, + input_types: list[pyarrow.DataType], + return_type: pyarrow.DataType, + volatility: Volatility | str, + ) -> None: + """Instantiate a user-defined window function (UDWF). + + See :py:func:`udwf` for a convenience function and argument + descriptions. + """ + self._udwf = df_internal.WindowUDF( + name, func, input_types, return_type, str(volatility) + ) + + def __call__(self, *args: Expr) -> Expr: + """Execute the UDWF. + + This function is not typically called by an end user. These calls will + occur during the evaluation of the dataframe. + """ + args_raw = [arg.expr for arg in args] + return Expr(self._udwf.__call__(*args_raw)) + + @staticmethod + def udwf( + func: WindowEvaluator, + input_types: pyarrow.DataType | list[pyarrow.DataType], + return_type: pyarrow.DataType, + volatility: Volatility | str, + name: str | None = None, + ) -> WindowUDF: + """Create a new User-Defined Window Function. + + Args: + func: The python function. + input_types: The data types of the arguments to ``func``. + return_type: The data type of the return value. + volatility: See :py:class:`Volatility` for allowed values. + name: A descriptive name for the function. + + Returns: + A user-defined window function. + """ + if not isinstance(func, WindowEvaluator): + raise TypeError( + "`func` must implement the abstract base class WindowEvaluator" + ) + if name is None: + name = func.__class__.__qualname__.lower() + if isinstance(input_types, pyarrow.DataType): + input_types = [input_types] + return WindowUDF( + name=name, + func=func, + input_types=input_types, + return_type=return_type, + volatility=volatility, + ) diff --git a/src/context.rs b/src/context.rs index 79db2e65..584823e1 100644 --- a/src/context.rs +++ b/src/context.rs @@ -42,6 +42,7 @@ use crate::sql::logical::PyLogicalPlan; use crate::store::StorageContexts; use crate::udaf::PyAggregateUDF; use crate::udf::PyScalarUDF; +use crate::udwf::PyWindowUDF; use crate::utils::{get_tokio_runtime, wait_for_future}; use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion::arrow::pyarrow::PyArrowType; @@ -746,6 +747,11 @@ impl PySessionContext { Ok(()) } + pub fn register_udwf(&mut self, udwf: PyWindowUDF) -> PyResult<()> { + self.ctx.register_udwf(udwf.function); + Ok(()) + } + #[pyo3(signature = (name="datafusion"))] pub fn catalog(&self, name: &str) -> PyResult { match self.ctx.catalog(name) { diff --git a/src/lib.rs b/src/lib.rs index e4cc2407..98821833 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,6 +58,7 @@ pub mod substrait; mod udaf; #[allow(clippy::borrow_deref_ref)] mod udf; +mod udwf; pub mod utils; #[cfg(feature = "mimalloc")] @@ -90,6 +91,7 @@ fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/udwf.rs b/src/udwf.rs new file mode 100644 index 00000000..31cc5e60 --- /dev/null +++ b/src/udwf.rs @@ -0,0 +1,305 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; + +use arrow::array::{make_array, Array, ArrayData, ArrayRef}; +use datafusion::logical_expr::window_state::WindowAggState; +use datafusion::scalar::ScalarValue; +use pyo3::exceptions::PyValueError; +use pyo3::prelude::*; + +use datafusion::arrow::datatypes::DataType; +use datafusion::arrow::pyarrow::{FromPyArrow, PyArrowType, ToPyArrow}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::logical_expr::{ + PartitionEvaluator, PartitionEvaluatorFactory, Signature, Volatility, WindowUDF, WindowUDFImpl, +}; +use pyo3::types::{PyList, PyTuple}; + +use crate::expr::PyExpr; +use crate::utils::parse_volatility; + +#[derive(Debug)] +struct RustPartitionEvaluator { + evaluator: PyObject, +} + +impl RustPartitionEvaluator { + fn new(evaluator: PyObject) -> Self { + Self { evaluator } + } +} + +impl PartitionEvaluator for RustPartitionEvaluator { + fn memoize(&mut self, _state: &mut WindowAggState) -> Result<()> { + Python::with_gil(|py| self.evaluator.bind(py).call_method0("memoize").map(|_| ())) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + } + + fn get_range(&self, idx: usize, n_rows: usize) -> Result> { + Python::with_gil(|py| { + let py_args = vec![idx.to_object(py), n_rows.to_object(py)]; + let py_args = PyTuple::new_bound(py, py_args); + + self.evaluator + .bind(py) + .call_method1("get_range", py_args) + .and_then(|v| { + let tuple: Bound<'_, PyTuple> = v.extract()?; + if tuple.len() != 2 { + return Err(PyValueError::new_err(format!( + "Expected get_range to return tuple of length 2. Received length {}", + tuple.len() + ))); + } + + let start: usize = tuple.get_item(0).unwrap().extract()?; + let end: usize = tuple.get_item(1).unwrap().extract()?; + + Ok(Range { start, end }) + }) + }) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + } + + fn is_causal(&self) -> bool { + Python::with_gil(|py| { + self.evaluator + .bind(py) + .call_method0("is_causal") + .and_then(|v| v.extract()) + .unwrap_or(false) + }) + } + + fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result { + Python::with_gil(|py| { + let py_values = PyList::new_bound( + py, + values + .iter() + .map(|arg| arg.into_data().to_pyarrow(py).unwrap()), + ); + let py_num_rows = num_rows.to_object(py).into_bound(py); + let py_args = PyTuple::new_bound( + py, + PyTuple::new_bound(py, vec![py_values.as_any(), &py_num_rows]), + ); + + self.evaluator + .bind(py) + .call_method1("evaluate_all", py_args) + .map(|v| { + let array_data = ArrayData::from_pyarrow_bound(&v).unwrap(); + make_array(array_data) + }) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + }) + } + + fn evaluate(&mut self, values: &[ArrayRef], range: &Range) -> Result { + Python::with_gil(|py| { + let py_values = PyList::new_bound( + py, + values + .iter() + .map(|arg| arg.into_data().to_pyarrow(py).unwrap()), + ); + let range_tuple = + PyTuple::new_bound(py, vec![range.start.to_object(py), range.end.to_object(py)]); + let py_args = PyTuple::new_bound( + py, + PyTuple::new_bound(py, vec![py_values.as_any(), range_tuple.as_any()]), + ); + + self.evaluator + .bind(py) + .call_method1("evaluate", py_args) + .and_then(|v| v.extract()) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + }) + } + + fn evaluate_all_with_rank( + &self, + num_rows: usize, + ranks_in_partition: &[Range], + ) -> Result { + Python::with_gil(|py| { + let ranks = ranks_in_partition + .iter() + .map(|r| PyTuple::new_bound(py, vec![r.start, r.end])); + + // 1. cast args to Pyarrow array + let py_args = vec![num_rows.to_object(py), PyList::new_bound(py, ranks).into()]; + + let py_args = PyTuple::new_bound(py, py_args); + + // 2. call function + self.evaluator + .bind(py) + .call_method1("evaluate_all_with_rank", py_args) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + .map(|v| { + let array_data = ArrayData::from_pyarrow_bound(&v).unwrap(); + make_array(array_data) + }) + }) + } + + fn supports_bounded_execution(&self) -> bool { + Python::with_gil(|py| { + self.evaluator + .bind(py) + .call_method0("supports_bounded_execution") + .and_then(|v| v.extract()) + .unwrap_or(false) + }) + } + + fn uses_window_frame(&self) -> bool { + Python::with_gil(|py| { + self.evaluator + .bind(py) + .call_method0("uses_window_frame") + .and_then(|v| v.extract()) + .unwrap_or(false) + }) + } + + fn include_rank(&self) -> bool { + Python::with_gil(|py| { + self.evaluator + .bind(py) + .call_method0("include_rank") + .and_then(|v| v.extract()) + .unwrap_or(false) + }) + } +} + +pub fn to_rust_partition_evaluator(evaluator: PyObject) -> PartitionEvaluatorFactory { + Arc::new(move || -> Result> { + let evaluator = Python::with_gil(|py| evaluator.clone_ref(py)); + Ok(Box::new(RustPartitionEvaluator::new(evaluator))) + }) +} + +/// Represents an WindowUDF +#[pyclass(name = "WindowUDF", module = "datafusion", subclass)] +#[derive(Debug, Clone)] +pub struct PyWindowUDF { + pub(crate) function: WindowUDF, +} + +#[pymethods] +impl PyWindowUDF { + #[new] + #[pyo3(signature=(name, evaluator, input_types, return_type, volatility))] + fn new( + name: &str, + evaluator: PyObject, + input_types: Vec>, + return_type: PyArrowType, + volatility: &str, + ) -> PyResult { + let return_type = return_type.0; + let input_types = input_types.into_iter().map(|t| t.0).collect(); + + let function = WindowUDF::from(MultiColumnWindowUDF::new( + name, + input_types, + return_type, + parse_volatility(volatility)?, + to_rust_partition_evaluator(evaluator), + )); + Ok(Self { function }) + } + + /// creates a new PyExpr with the call of the udf + #[pyo3(signature = (*args))] + fn __call__(&self, args: Vec) -> PyResult { + let args = args.iter().map(|e| e.expr.clone()).collect(); + Ok(self.function.call(args).into()) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("WindowUDF({})", self.function.name())) + } +} + +pub struct MultiColumnWindowUDF { + name: String, + signature: Signature, + return_type: DataType, + partition_evaluator_factory: PartitionEvaluatorFactory, +} + +impl std::fmt::Debug for MultiColumnWindowUDF { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("WindowUDF") + .field("name", &self.name) + .field("signature", &self.signature) + .field("return_type", &"") + .field("partition_evaluator_factory", &"") + .finish() + } +} + +impl MultiColumnWindowUDF { + pub fn new( + name: impl Into, + input_types: Vec, + return_type: DataType, + volatility: Volatility, + partition_evaluator_factory: PartitionEvaluatorFactory, + ) -> Self { + let name = name.into(); + let signature = Signature::exact(input_types, volatility); + Self { + name, + signature, + return_type, + partition_evaluator_factory, + } + } +} + +impl WindowUDFImpl for MultiColumnWindowUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + &self.name + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(self.return_type.clone()) + } + + fn partition_evaluator(&self) -> Result> { + (self.partition_evaluator_factory)() + } +}