From 94470d8c97925aab4f9dbc3c437169dded2ca8e6 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 20 Sep 2024 10:40:01 -0400 Subject: [PATCH 01/11] Adding PyWindowUDF and implementing PartitionEvaluator for it. Still requires python side work. --- src/context.rs | 6 ++ src/lib.rs | 1 + src/udwf.rs | 239 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 246 insertions(+) create mode 100644 src/udwf.rs diff --git a/src/context.rs b/src/context.rs index 79db2e65..584823e1 100644 --- a/src/context.rs +++ b/src/context.rs @@ -42,6 +42,7 @@ use crate::sql::logical::PyLogicalPlan; use crate::store::StorageContexts; use crate::udaf::PyAggregateUDF; use crate::udf::PyScalarUDF; +use crate::udwf::PyWindowUDF; use crate::utils::{get_tokio_runtime, wait_for_future}; use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion::arrow::pyarrow::PyArrowType; @@ -746,6 +747,11 @@ impl PySessionContext { Ok(()) } + pub fn register_udwf(&mut self, udwf: PyWindowUDF) -> PyResult<()> { + self.ctx.register_udwf(udwf.function); + Ok(()) + } + #[pyo3(signature = (name="datafusion"))] pub fn catalog(&self, name: &str) -> PyResult { match self.ctx.catalog(name) { diff --git a/src/lib.rs b/src/lib.rs index e4cc2407..782121e7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,6 +58,7 @@ pub mod substrait; mod udaf; #[allow(clippy::borrow_deref_ref)] mod udf; +mod udwf; pub mod utils; #[cfg(feature = "mimalloc")] diff --git a/src/udwf.rs b/src/udwf.rs new file mode 100644 index 00000000..47065866 --- /dev/null +++ b/src/udwf.rs @@ -0,0 +1,239 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ops::Range; +use std::sync::Arc; + +use arrow::array::{make_array, Array, ArrayData, ArrayRef}; +use datafusion::logical_expr::window_state::WindowAggState; +use datafusion::prelude::create_udwf; +use datafusion::scalar::ScalarValue; +use pyo3::exceptions::PyValueError; +use pyo3::prelude::*; + +use datafusion::arrow::datatypes::DataType; +use datafusion::arrow::pyarrow::{FromPyArrow, PyArrowType, ToPyArrow}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::logical_expr::{PartitionEvaluator, PartitionEvaluatorFactory, WindowUDF}; +use pyo3::types::{PyList, PyTuple}; + +use crate::expr::PyExpr; +use crate::utils::parse_volatility; + +#[derive(Debug)] +struct RustPartitionEvaluator { + evaluator: PyObject, +} + +impl RustPartitionEvaluator { + fn new(evaluator: PyObject) -> Self { + Self { evaluator } + } +} + +impl PartitionEvaluator for RustPartitionEvaluator { + fn memoize(&mut self, _state: &mut WindowAggState) -> Result<()> { + Python::with_gil(|py| self.evaluator.bind(py).call_method0("memoize").map(|_| ())) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + } + + fn get_range(&self, idx: usize, n_rows: usize) -> Result> { + Python::with_gil(|py| { + let py_args = vec![idx.to_object(py), n_rows.to_object(py)]; + let py_args = PyTuple::new_bound(py, py_args); + + self.evaluator + .bind(py) + .call_method1("get_range", py_args) + .and_then(|v| { + let tuple: Bound<'_, PyTuple> = v.extract()?; + if tuple.len() != 2 { + return Err(PyValueError::new_err(format!( + "Expected get_range to return tuple of length 2. Received length {}", + tuple.len() + ))); + } + + let start: usize = tuple.get_item(0).unwrap().extract()?; + let end: usize = tuple.get_item(1).unwrap().extract()?; + + Ok(Range { start, end }) + }) + }) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + } + + fn is_causal(&self) -> bool { + Python::with_gil(|py| { + self.evaluator + .bind(py) + .call_method0("is_causal") + .and_then(|v| v.extract()) + }) + .unwrap_or(false) + } + + fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result { + Python::with_gil(|py| { + // 1. cast args to Pyarrow array + let mut py_args = values + .iter() + .map(|arg| arg.into_data().to_pyarrow(py).unwrap()) + .collect::>(); + py_args.push(num_rows.to_object(py)); + let py_args = PyTuple::new_bound(py, py_args); + + // 2. call function + self.evaluator + .bind(py) + .call_method1("evaluate_all", py_args) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + .map(|v| { + let array_data = ArrayData::from_pyarrow_bound(&v).unwrap(); + make_array(array_data) + }) + }) + } + + fn evaluate(&mut self, values: &[ArrayRef], range: &Range) -> Result { + Python::with_gil(|py| { + // 1. cast args to Pyarrow array + let mut py_args = values + .iter() + .map(|arg| arg.into_data().to_pyarrow(py).unwrap()) + .collect::>(); + py_args.push(range.start.to_object(py)); + py_args.push(range.end.to_object(py)); + let py_args = PyTuple::new_bound(py, py_args); + + // 2. call function + self.evaluator + .bind(py) + .call_method1("evaluate", py_args) + .and_then(|v| v.extract()) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + }) + } + + fn evaluate_all_with_rank( + &self, + num_rows: usize, + ranks_in_partition: &[Range], + ) -> Result { + Python::with_gil(|py| { + let ranks = ranks_in_partition + .iter() + .map(|r| PyTuple::new_bound(py, vec![r.start, r.end])); + + // 1. cast args to Pyarrow array + let py_args = vec![num_rows.to_object(py), PyList::new_bound(py, ranks).into()]; + + let py_args = PyTuple::new_bound(py, py_args); + + // 2. call function + self.evaluator + .bind(py) + .call_method1("evaluate_all_with_rank", py_args) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + .map(|v| { + let array_data = ArrayData::from_pyarrow_bound(&v).unwrap(); + make_array(array_data) + }) + }) + } + + fn supports_bounded_execution(&self) -> bool { + Python::with_gil(|py| { + self.evaluator + .bind(py) + .call_method0("supports_bounded_execution") + .and_then(|v| v.extract()) + }) + .unwrap_or(false) + } + + fn uses_window_frame(&self) -> bool { + Python::with_gil(|py| { + self.evaluator + .bind(py) + .call_method0("uses_window_frame") + .and_then(|v| v.extract()) + }) + .unwrap_or(false) + } + + fn include_rank(&self) -> bool { + Python::with_gil(|py| { + self.evaluator + .bind(py) + .call_method0("include_rank") + .and_then(|v| v.extract()) + }) + .unwrap_or(false) + } +} + +pub fn to_rust_partition_evaluator(evalutor: PyObject) -> PartitionEvaluatorFactory { + Arc::new(move || -> Result> { + let evalutor = Python::with_gil(|py| { + evalutor + .call0(py) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + })?; + Ok(Box::new(RustPartitionEvaluator::new(evalutor))) + }) +} + +/// Represents an WindowUDF +#[pyclass(name = "WindowUDF", module = "datafusion", subclass)] +#[derive(Debug, Clone)] +pub struct PyWindowUDF { + pub(crate) function: WindowUDF, +} + +#[pymethods] +impl PyWindowUDF { + #[new] + #[pyo3(signature=(name, evaluator, input_type, return_type, volatility))] + fn new( + name: &str, + evaluator: PyObject, + input_type: PyArrowType, + return_type: PyArrowType, + volatility: &str, + ) -> PyResult { + let function = create_udwf( + name, + input_type.0, + Arc::new(return_type.0), + parse_volatility(volatility)?, + to_rust_partition_evaluator(evaluator), + ); + Ok(Self { function }) + } + + /// creates a new PyExpr with the call of the udf + #[pyo3(signature = (*args))] + fn __call__(&self, args: Vec) -> PyResult { + let args = args.iter().map(|e| e.expr.clone()).collect(); + Ok(self.function.call(args).into()) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("WindowUDF({})", self.function.name())) + } +} From f306dde23bde648dddb3b079321983fe1a100dca Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 20 Sep 2024 20:55:09 -0400 Subject: [PATCH 02/11] Add python wrappers for UDWF --- python/datafusion/__init__.py | 5 +- python/datafusion/context.py | 7 +- python/datafusion/udf.py | 215 ++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/udwf.rs | 17 ++- 5 files changed, 233 insertions(+), 12 deletions(-) diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py index 08ca3fe0..4f40b208 100644 --- a/python/datafusion/__init__.py +++ b/python/datafusion/__init__.py @@ -40,7 +40,7 @@ from .record_batch import RecordBatchStream, RecordBatch -from .udf import ScalarUDF, AggregateUDF, Accumulator +from .udf import ScalarUDF, AggregateUDF, Accumulator, WindowUDF from .common import ( DFSchema, @@ -78,6 +78,7 @@ "Database", "Table", "AggregateUDF", + "WindowUDF", "LogicalPlan", "ExecutionPlan", "RecordBatch", @@ -113,3 +114,5 @@ def lit(value): udf = ScalarUDF.udf udaf = AggregateUDF.udaf + +udwf = WindowUDF.udwf diff --git a/python/datafusion/context.py b/python/datafusion/context.py index 35a40ccd..59d053e1 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -25,12 +25,11 @@ from ._internal import SessionContext as SessionContextInternal from ._internal import LogicalPlan, ExecutionPlan -from datafusion._internal import AggregateUDF from datafusion.catalog import Catalog, Table from datafusion.dataframe import DataFrame from datafusion.expr import Expr, SortExpr, sort_list_to_raw_sort_list from datafusion.record_batch import RecordBatchStream -from datafusion.udf import ScalarUDF +from datafusion.udf import ScalarUDF, AggregateUDF, WindowUDF from typing import Any, TYPE_CHECKING from typing_extensions import deprecated @@ -833,6 +832,10 @@ def register_udaf(self, udaf: AggregateUDF) -> None: """Register a user-defined aggregation function (UDAF) with the context.""" self.ctx.register_udaf(udaf._udaf) + def register_udwf(self, udwf: WindowUDF) -> None: + """Register a user-defined window function (UDWF) with the context.""" + self.ctx.register_udwf(udwf._udwf) + def catalog(self, name: str = "datafusion") -> Catalog: """Retrieve a catalog by name.""" return self.ctx.catalog(name) diff --git a/python/datafusion/udf.py b/python/datafusion/udf.py index f74d675e..2520ceb1 100644 --- a/python/datafusion/udf.py +++ b/python/datafusion/udf.py @@ -246,3 +246,218 @@ def udaf( state_type=state_type, volatility=volatility, ) + + +class WindowEvaluator(metaclass=ABCMeta): + """Evaluator class for user defined window functions (UDWF). + + Users should inherit from this class and implement ``evaluate``, ``evaluate_all``, + and/or ``evaluate_all_with_rank``. If using `evaluate` only you will need to + override ``supports_bounded_execution``. + """ + + def memoize(self) -> None: + """Perform a memoize operation to improve performance. + + When the window frame has a fixed beginning (e.g UNBOUNDED + PRECEDING), some functions such as FIRST_VALUE, LAST_VALUE and + NTH_VALUE do not need the (unbounded) input once they have + seen a certain amount of input. + + `memoize` is called after each input batch is processed, and + such functions can save whatever they need + """ + pass + + def get_range(self, idx: int, n_rows: int) -> tuple[int, int]: + """Return the range for the window fuction. + + If `uses_window_frame` flag is `false`. This method is used to + calculate required range for the window function during + stateful execution. + + Generally there is no required range, hence by default this + returns smallest range(current row). e.g seeing current row is + enough to calculate window result (such as row_number, rank, + etc) + + Args: + idx:: Current index + n_rows: Number of rows. + """ + return (idx, idx + 1) + + def is_causal(self) -> bool: + """Get whether evaluator needs future data for its result.""" + return False + + def evaluate_all(self, values: pyarrow.Array, num_rows: int) -> pyarrow.Array: + """Evaluate a window function on an entire input partition. + + This function is called once per input *partition* for window + functions that *do not use* values from the window frame, + such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`, `PERCENT_RANK`, + `CUME_DIST`, `LEAD`, `LAG`). + + It produces the result of all rows in a single pass. It + expects to receive the entire partition as the `value` and + must produce an output column with one output row for every + input row. + + `num_rows` is required to correctly compute the output in case + `values.len() == 0` + + Implementing this function is an optimization: certain window + functions are not affected by the window frame definition or + the query doesn't have a frame, and `evaluate` skips the + (costly) window frame boundary calculation and the overhead of + calling `evaluate` for each output row. + + For example, the `LAG` built in window function does not use + the values of its window frame (it can be computed in one shot + on the entire partition with `Self::evaluate_all` regardless of the + window defined in the `OVER` clause) + + ```sql + lag(x, 1) OVER (ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) + ``` + + However, `avg()` computes the average in the window and thus + does use its window frame + + ```sql + avg(x) OVER (PARTITION BY y ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) + ``` + """ + if self.supports_bounded_execution() and not self.uses_window_frame(): + res = [] + for idx in range(0, num_rows): + res.append(self.evaluate(values, self.get_range(idx, num_rows))) + return pyarrow.array(res) + else: + raise + + @abstractmethod + def evaluate(self, values: pyarrow.Array, range: tuple[int, int]) -> pyarrow.Scalar: + """Evaluate window function on a range of rows in an input partition. + + This is the simplest and most general function to implement + but also the least performant as it creates output one row at + a time. It is typically much faster to implement stateful + evaluation using one of the other specialized methods on this + trait. + + Returns a [`ScalarValue`] that is the value of the window + function within `range` for the entire partition. Argument + `values` contains the evaluation result of function arguments + and evaluation results of ORDER BY expressions. If function has a + single argument, `values[1..]` will contain ORDER BY expression results. + """ + pass + + @abstractmethod + def evaluate_all_with_rank( + self, num_rows: int, ranks_in_partition: list[tuple[int, int]] + ) -> pyarrow.Array: + """Called for window functions that only need the rank of a row. + + Evaluate the partition evaluator against the partition using + the row ranks. For example, `RANK(col)` produces + + ```text + col | rank + --- + ---- + A | 1 + A | 1 + C | 3 + D | 4 + D | 5 + ``` + + For this case, `num_rows` would be `5` and the + `ranks_in_partition` would be called with + + ```text + [ + (0,1), + (2,2), + (3,4), + ] + """ + pass + + def supports_bounded_execution(self) -> bool: + """Can the window function be incrementally computed using bounded memory?""" + return False + + def uses_window_frame(self) -> bool: + """Does the window function use the values from the window frame?""" + return False + + def include_rank(self) -> bool: + """Can this function be evaluated with (only) rank?""" + return False + + +class WindowUDF: + """Class for performing window user defined functions (UDF). + + Window UDFs operate on a partition of rows. See + also :py:class:`ScalarUDF` for operating on a row by row basis. + """ + + def __init__( + self, + name: str | None, + func: WindowEvaluator, + input_type: pyarrow.DataType, + return_type: _R, + volatility: Volatility | str, + ) -> None: + """Instantiate a user defined window function (UDWF). + + See :py:func:`udwf` for a convenience function and argument + descriptions. + """ + self._udwf = df_internal.WindowUDF( + name, func, input_type, return_type, str(volatility) + ) + + def __call__(self, *args: Expr) -> Expr: + """Execute the UDWF. + + This function is not typically called by an end user. These calls will + occur during the evaluation of the dataframe. + """ + args_raw = [arg.expr for arg in args] + return Expr(self._udwf.__call__(*args_raw)) + + @staticmethod + def udwf( + func: Callable[..., _R], + input_type: pyarrow.DataType, + return_type: _R, + volatility: Volatility | str, + name: str | None = None, + ) -> WindowUDF: + """Create a new User Defined Window Function. + + Args: + func: The python function. + input_type: The data type of the arguments to ``func``. + return_type: The data type of the return value. + volatility: See :py:class:`Volatility` for allowed values. + name: A descriptive name for the function. + + Returns: + A user defined window function. + """ + if name is None: + name = func.__qualname__.lower() + return WindowUDF( + name=name, + func=func, + input_type=input_type, + return_type=return_type, + volatility=volatility, + ) diff --git a/src/lib.rs b/src/lib.rs index 782121e7..98821833 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -91,6 +91,7 @@ fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/udwf.rs b/src/udwf.rs index 47065866..651c1e32 100644 --- a/src/udwf.rs +++ b/src/udwf.rs @@ -83,13 +83,12 @@ impl PartitionEvaluator for RustPartitionEvaluator { .bind(py) .call_method0("is_causal") .and_then(|v| v.extract()) + .unwrap_or(false) }) - .unwrap_or(false) } fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result { Python::with_gil(|py| { - // 1. cast args to Pyarrow array let mut py_args = values .iter() .map(|arg| arg.into_data().to_pyarrow(py).unwrap()) @@ -97,15 +96,14 @@ impl PartitionEvaluator for RustPartitionEvaluator { py_args.push(num_rows.to_object(py)); let py_args = PyTuple::new_bound(py, py_args); - // 2. call function self.evaluator .bind(py) .call_method1("evaluate_all", py_args) - .map_err(|e| DataFusionError::Execution(format!("{e}"))) .map(|v| { let array_data = ArrayData::from_pyarrow_bound(&v).unwrap(); make_array(array_data) }) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) }) } @@ -116,8 +114,9 @@ impl PartitionEvaluator for RustPartitionEvaluator { .iter() .map(|arg| arg.into_data().to_pyarrow(py).unwrap()) .collect::>(); - py_args.push(range.start.to_object(py)); - py_args.push(range.end.to_object(py)); + let range_tuple = + PyTuple::new_bound(py, vec![range.start.to_object(py), range.end.to_object(py)]); + py_args.push(range_tuple.into()); let py_args = PyTuple::new_bound(py, py_args); // 2. call function @@ -162,8 +161,8 @@ impl PartitionEvaluator for RustPartitionEvaluator { .bind(py) .call_method0("supports_bounded_execution") .and_then(|v| v.extract()) + .unwrap_or(false) }) - .unwrap_or(false) } fn uses_window_frame(&self) -> bool { @@ -172,8 +171,8 @@ impl PartitionEvaluator for RustPartitionEvaluator { .bind(py) .call_method0("uses_window_frame") .and_then(|v| v.extract()) + .unwrap_or(false) }) - .unwrap_or(false) } fn include_rank(&self) -> bool { @@ -182,8 +181,8 @@ impl PartitionEvaluator for RustPartitionEvaluator { .bind(py) .call_method0("include_rank") .and_then(|v| v.extract()) + .unwrap_or(false) }) - .unwrap_or(false) } } From a661ef9898e9f20bce41b7365d6302eb01a47a40 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 21 Sep 2024 09:11:46 -0400 Subject: [PATCH 03/11] adding unit tests for user defined window functions --- python/datafusion/tests/test_udwf.py | 105 +++++++++++++++++++++++++++ python/datafusion/udf.py | 43 ++++++----- 2 files changed, 130 insertions(+), 18 deletions(-) create mode 100644 python/datafusion/tests/test_udwf.py diff --git a/python/datafusion/tests/test_udwf.py b/python/datafusion/tests/test_udwf.py new file mode 100644 index 00000000..1c4d2cf9 --- /dev/null +++ b/python/datafusion/tests/test_udwf.py @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyarrow as pa +import pytest + +from datafusion import SessionContext, column, udwf, lit, functions as f +from datafusion.udf import WindowEvaluator + + +class ExponentialSmooth(WindowEvaluator): + """Interface of a user-defined accumulation.""" + + def __init__(self) -> None: + self.alpha = 0.9 + + def evaluate_all(self, values: pa.Array, num_rows: int) -> pa.Array: + results = [] + curr_value = 0.0 + for idx in range(num_rows): + if idx == 0: + curr_value = values[idx].as_py() + else: + curr_value = values[idx].as_py() * self.alpha + curr_value * ( + 1.0 - self.alpha + ) + results.append(curr_value) + + return pa.array(results) + + +class NotSubclassOfWindowEvaluator: + pass + + +@pytest.fixture +def df(): + ctx = SessionContext() + + # create a RecordBatch and a new DataFrame from it + batch = pa.RecordBatch.from_arrays( + [ + pa.array([0, 1, 2, 3, 4, 5, 6]), + pa.array([7, 4, 3, 8, 9, 1, 6]), + pa.array(["A", "A", "A", "A", "B", "B", "B"]), + ], + names=["a", "b", "c"], + ) + return ctx.create_dataframe([[batch]]) + + +def test_udwf_errors(df): + with pytest.raises(TypeError): + udwf( + NotSubclassOfWindowEvaluator, + pa.float64(), + pa.float64(), + volatility="immutable", + ) + + +smooth = udwf( + ExponentialSmooth, + pa.float64(), + pa.float64(), + volatility="immutable", +) + +data_test_udwf_functions = [ + ("smooth_udwf", smooth(column("a")), [0, 0.9, 1.89, 2.889, 3.889, 4.889, 5.889]), + ( + "partitioned_udwf", + smooth(column("a")).partition_by(column("c")).build(), + [0, 0.9, 1.89, 2.889, 4.0, 4.9, 5.89], + ), + ( + "ordered_udwf", + smooth(column("a")).order_by(column("b")).build(), + [0.551, 1.13, 2.3, 2.755, 3.876, 5.0, 5.513], + ), +] + + +@pytest.mark.parametrize("name,expr,expected", data_test_udwf_functions) +def test_udwf_functions(df, name, expr, expected): + df = df.select("a", f.round(expr, lit(3)).alias(name)) + + # execute and collect the first (and only) batch + result = df.sort(column("a")).select(column(name)).collect()[0] + + assert result.column(0) == pa.array(expected) diff --git a/python/datafusion/udf.py b/python/datafusion/udf.py index 2520ceb1..a710b905 100644 --- a/python/datafusion/udf.py +++ b/python/datafusion/udf.py @@ -21,7 +21,7 @@ import datafusion._internal as df_internal from datafusion.expr import Expr -from typing import Callable, TYPE_CHECKING, TypeVar +from typing import Callable, TYPE_CHECKING, TypeVar, Type from abc import ABCMeta, abstractmethod from typing import List from enum import Enum @@ -251,10 +251,15 @@ def udaf( class WindowEvaluator(metaclass=ABCMeta): """Evaluator class for user defined window functions (UDWF). - Users should inherit from this class and implement ``evaluate``, ``evaluate_all``, - and/or ``evaluate_all_with_rank``. If using `evaluate` only you will need to - override ``supports_bounded_execution``. - """ + It is up to the user to decide which evaluate function is appropriate. + + |``uses_window_frame``|``supports_bounded_execution``|``include_rank``|function_to_implement| + |---|---|----|----| + |False (default) |False (default) |False (default) | ``evaluate_all`` | + |False |True |False | ``evaluate`` | + |False |True/False |True | ``evaluate_all_with_rank`` | + |True |True/False |True/False | ``evaluate`` | + """ # noqa: W505 def memoize(self) -> None: """Perform a memoize operation to improve performance. @@ -329,15 +334,8 @@ def evaluate_all(self, values: pyarrow.Array, num_rows: int) -> pyarrow.Array: avg(x) OVER (PARTITION BY y ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) ``` """ - if self.supports_bounded_execution() and not self.uses_window_frame(): - res = [] - for idx in range(0, num_rows): - res.append(self.evaluate(values, self.get_range(idx, num_rows))) - return pyarrow.array(res) - else: - raise + pass - @abstractmethod def evaluate(self, values: pyarrow.Array, range: tuple[int, int]) -> pyarrow.Scalar: """Evaluate window function on a range of rows in an input partition. @@ -355,7 +353,6 @@ def evaluate(self, values: pyarrow.Array, range: tuple[int, int]) -> pyarrow.Sca """ pass - @abstractmethod def evaluate_all_with_rank( self, num_rows: int, ranks_in_partition: list[tuple[int, int]] ) -> pyarrow.Array: @@ -383,6 +380,8 @@ def evaluate_all_with_rank( (2,2), (3,4), ] + + The user must implement this method if ``include_rank`` returns True. """ pass @@ -399,6 +398,10 @@ def include_rank(self) -> bool: return False +if TYPE_CHECKING: + _W = TypeVar("_W", bound=WindowEvaluator) + + class WindowUDF: """Class for performing window user defined functions (UDF). @@ -409,9 +412,9 @@ class WindowUDF: def __init__( self, name: str | None, - func: WindowEvaluator, + func: Type[WindowEvaluator], input_type: pyarrow.DataType, - return_type: _R, + return_type: pyarrow.DataType, volatility: Volatility | str, ) -> None: """Instantiate a user defined window function (UDWF). @@ -434,9 +437,9 @@ def __call__(self, *args: Expr) -> Expr: @staticmethod def udwf( - func: Callable[..., _R], + func: Type[WindowEvaluator], input_type: pyarrow.DataType, - return_type: _R, + return_type: pyarrow.DataType, volatility: Volatility | str, name: str | None = None, ) -> WindowUDF: @@ -452,6 +455,10 @@ def udwf( Returns: A user defined window function. """ + if not issubclass(func, WindowEvaluator): + raise TypeError( + "`func` must implement the abstract base class WindowEvaluator" + ) if name is None: name = func.__qualname__.lower() return WindowUDF( From f530f0e5dc4b97ad66fc3dcc379973a25f416197 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 21 Sep 2024 10:33:03 -0400 Subject: [PATCH 04/11] Change udwf() to take an instance rather than a class so we can parameterize it --- python/datafusion/tests/test_udwf.py | 8 ++++---- python/datafusion/udf.py | 10 +++++----- src/udwf.rs | 10 +++------- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/python/datafusion/tests/test_udwf.py b/python/datafusion/tests/test_udwf.py index 1c4d2cf9..b72ebedb 100644 --- a/python/datafusion/tests/test_udwf.py +++ b/python/datafusion/tests/test_udwf.py @@ -25,8 +25,8 @@ class ExponentialSmooth(WindowEvaluator): """Interface of a user-defined accumulation.""" - def __init__(self) -> None: - self.alpha = 0.9 + def __init__(self, alpha: float) -> None: + self.alpha = alpha def evaluate_all(self, values: pa.Array, num_rows: int) -> pa.Array: results = [] @@ -66,7 +66,7 @@ def df(): def test_udwf_errors(df): with pytest.raises(TypeError): udwf( - NotSubclassOfWindowEvaluator, + NotSubclassOfWindowEvaluator(), pa.float64(), pa.float64(), volatility="immutable", @@ -74,7 +74,7 @@ def test_udwf_errors(df): smooth = udwf( - ExponentialSmooth, + ExponentialSmooth(0.9), pa.float64(), pa.float64(), volatility="immutable", diff --git a/python/datafusion/udf.py b/python/datafusion/udf.py index a710b905..f2822c38 100644 --- a/python/datafusion/udf.py +++ b/python/datafusion/udf.py @@ -21,7 +21,7 @@ import datafusion._internal as df_internal from datafusion.expr import Expr -from typing import Callable, TYPE_CHECKING, TypeVar, Type +from typing import Callable, TYPE_CHECKING, TypeVar from abc import ABCMeta, abstractmethod from typing import List from enum import Enum @@ -412,7 +412,7 @@ class WindowUDF: def __init__( self, name: str | None, - func: Type[WindowEvaluator], + func: WindowEvaluator, input_type: pyarrow.DataType, return_type: pyarrow.DataType, volatility: Volatility | str, @@ -437,7 +437,7 @@ def __call__(self, *args: Expr) -> Expr: @staticmethod def udwf( - func: Type[WindowEvaluator], + func: WindowEvaluator, input_type: pyarrow.DataType, return_type: pyarrow.DataType, volatility: Volatility | str, @@ -455,12 +455,12 @@ def udwf( Returns: A user defined window function. """ - if not issubclass(func, WindowEvaluator): + if not isinstance(func, WindowEvaluator): raise TypeError( "`func` must implement the abstract base class WindowEvaluator" ) if name is None: - name = func.__qualname__.lower() + name = func.__class__.__qualname__.lower() return WindowUDF( name=name, func=func, diff --git a/src/udwf.rs b/src/udwf.rs index 651c1e32..53a46569 100644 --- a/src/udwf.rs +++ b/src/udwf.rs @@ -186,14 +186,10 @@ impl PartitionEvaluator for RustPartitionEvaluator { } } -pub fn to_rust_partition_evaluator(evalutor: PyObject) -> PartitionEvaluatorFactory { +pub fn to_rust_partition_evaluator(evaluator: PyObject) -> PartitionEvaluatorFactory { Arc::new(move || -> Result> { - let evalutor = Python::with_gil(|py| { - evalutor - .call0(py) - .map_err(|e| DataFusionError::Execution(format!("{e}"))) - })?; - Ok(Box::new(RustPartitionEvaluator::new(evalutor))) + let evaluator = Python::with_gil(|py| evaluator.clone_ref(py)); + Ok(Box::new(RustPartitionEvaluator::new(evaluator))) }) } From d63f84fecbf08b3bd8745851ecec8317853ff552 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 22 Sep 2024 09:15:59 -0400 Subject: [PATCH 05/11] Pass multiple arrays for udwf evaluate so we can capture the order_by and also multiple columns --- python/datafusion/tests/test_udwf.py | 168 +++++++++++++++++++++++++-- python/datafusion/udf.py | 10 +- src/udwf.rs | 36 +++--- 3 files changed, 185 insertions(+), 29 deletions(-) diff --git a/python/datafusion/tests/test_udwf.py b/python/datafusion/tests/test_udwf.py index b72ebedb..c9d66305 100644 --- a/python/datafusion/tests/test_udwf.py +++ b/python/datafusion/tests/test_udwf.py @@ -20,17 +20,19 @@ from datafusion import SessionContext, column, udwf, lit, functions as f from datafusion.udf import WindowEvaluator +from datafusion.expr import WindowFrame -class ExponentialSmooth(WindowEvaluator): +class ExponentialSmoothDefault(WindowEvaluator): """Interface of a user-defined accumulation.""" def __init__(self, alpha: float) -> None: self.alpha = alpha - def evaluate_all(self, values: pa.Array, num_rows: int) -> pa.Array: + def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array: results = [] curr_value = 0.0 + values = values[0] for idx in range(num_rows): if idx == 0: curr_value = values[idx].as_py() @@ -43,6 +45,90 @@ def evaluate_all(self, values: pa.Array, num_rows: int) -> pa.Array: return pa.array(results) +class ExponentialSmoothBounded(WindowEvaluator): + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def supports_bounded_execution(self) -> bool: + return True + + def get_range(self, idx: int, num_rows: int) -> tuple[int, int]: + # Ovrerride the default range of current row since uses_window_frame is False + # So for the purpose of this test we just smooth from the previous row to + # current. + if idx == 0: + return (0, 0) + return (idx - 1, idx) + + def evaluate( + self, values: list[pa.Array], eval_range: tuple[int, int] + ) -> pa.Scalar: + (start, stop) = eval_range + curr_value = 0.0 + values = values[0] + for idx in range(start, stop + 1): + if idx == start: + curr_value = values[idx].as_py() + else: + curr_value = values[idx].as_py() * self.alpha + curr_value * ( + 1.0 - self.alpha + ) + return pa.scalar(curr_value).cast(pa.float64()) + + +class ExponentialSmoothRank(WindowEvaluator): + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def include_rank(self) -> bool: + return True + + def evaluate_all_with_rank( + self, num_rows: int, ranks_in_partition: list[tuple[int, int]] + ) -> pa.Array: + results = [] + for idx in range(num_rows): + if idx == 0: + prior_value = 1.0 + matching_row = [ + i + for i in range(len(ranks_in_partition)) + if ranks_in_partition[i][0] <= idx and ranks_in_partition[i][1] > idx + ][0] + 1 + curr_value = matching_row * self.alpha + prior_value * (1.0 - self.alpha) + results.append(curr_value) + prior_value = matching_row + + return pa.array(results) + + +class ExponentialSmoothFrame(WindowEvaluator): + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def uses_window_frame(self) -> bool: + return True + + def evaluate( + self, values: list[pa.Array], eval_range: tuple[int, int] + ) -> pa.Scalar: + (start, stop) = eval_range + curr_value = 0.0 + if len(values) > 1: + order_by = values[1] # noqa: F841 + values = values[0] + else: + values = values[0] + for idx in range(start, stop): + if idx == start: + curr_value = values[idx].as_py() + else: + curr_value = values[idx].as_py() * self.alpha + curr_value * ( + 1.0 - self.alpha + ) + return pa.scalar(curr_value).cast(pa.float64()) + + class NotSubclassOfWindowEvaluator: pass @@ -73,23 +159,83 @@ def test_udwf_errors(df): ) -smooth = udwf( - ExponentialSmooth(0.9), +smooth_default = udwf( + ExponentialSmoothDefault(0.9), + pa.float64(), + pa.float64(), + volatility="immutable", +) + +smooth_bounded = udwf( + ExponentialSmoothBounded(0.9), + pa.float64(), + pa.float64(), + volatility="immutable", +) + +smooth_rank = udwf( + ExponentialSmoothRank(0.9), + pa.utf8(), + pa.float64(), + volatility="immutable", +) + +smooth_frame = udwf( + ExponentialSmoothFrame(0.9), pa.float64(), pa.float64(), volatility="immutable", ) data_test_udwf_functions = [ - ("smooth_udwf", smooth(column("a")), [0, 0.9, 1.89, 2.889, 3.889, 4.889, 5.889]), ( - "partitioned_udwf", - smooth(column("a")).partition_by(column("c")).build(), + "default_udwf", + smooth_default(column("a")), + [0, 0.9, 1.89, 2.889, 3.889, 4.889, 5.889], + ), + ( + "default_udwf_partitioned", + smooth_default(column("a")).partition_by(column("c")).build(), [0, 0.9, 1.89, 2.889, 4.0, 4.9, 5.89], ), ( - "ordered_udwf", - smooth(column("a")).order_by(column("b")).build(), + "default_udwf_ordered", + smooth_default(column("a")).order_by(column("b")).build(), + [0.551, 1.13, 2.3, 2.755, 3.876, 5.0, 5.513], + ), + ( + "bounded_udwf", + smooth_bounded(column("a")), + [0, 0.9, 1.9, 2.9, 3.9, 4.9, 5.9], + ), + ( + "bounded_udwf_ignores_frame", + smooth_bounded(column("a")) + .window_frame(WindowFrame("rows", None, None)) + .build(), + [0, 0.9, 1.9, 2.9, 3.9, 4.9, 5.9], + ), + ( + "rank_udwf", + smooth_rank(column("c")).order_by(column("c")).build(), + [1, 1, 1, 1, 1.9, 2, 2], + ), + ( + "frame_unbounded_udwf", + smooth_frame(column("a")).window_frame(WindowFrame("rows", None, None)).build(), + [5.889, 5.889, 5.889, 5.889, 5.889, 5.889, 5.889], + ), + ( + "frame_bounded_udwf", + smooth_frame(column("a")).window_frame(WindowFrame("rows", None, 0)).build(), + [0.0, 0.9, 1.89, 2.889, 3.889, 4.889, 5.889], + ), + ( + "frame_bounded_udwf", + smooth_frame(column("a")) + .window_frame(WindowFrame("rows", None, 0)) + .order_by(column("b")) + .build(), [0.551, 1.13, 2.3, 2.755, 3.876, 5.0, 5.513], ), ] @@ -97,8 +243,8 @@ def test_udwf_errors(df): @pytest.mark.parametrize("name,expr,expected", data_test_udwf_functions) def test_udwf_functions(df, name, expr, expected): - df = df.select("a", f.round(expr, lit(3)).alias(name)) - + df = df.select("a", "b", f.round(expr, lit(3)).alias(name)) + df.sort(column("a")).show() # execute and collect the first (and only) batch result = df.sort(column("a")).select(column(name)).collect()[0] diff --git a/python/datafusion/udf.py b/python/datafusion/udf.py index f2822c38..f6976ca5 100644 --- a/python/datafusion/udf.py +++ b/python/datafusion/udf.py @@ -274,7 +274,7 @@ def memoize(self) -> None: """ pass - def get_range(self, idx: int, n_rows: int) -> tuple[int, int]: + def get_range(self, idx: int, num_rows: int) -> tuple[int, int]: """Return the range for the window fuction. If `uses_window_frame` flag is `false`. This method is used to @@ -288,7 +288,7 @@ def get_range(self, idx: int, n_rows: int) -> tuple[int, int]: Args: idx:: Current index - n_rows: Number of rows. + num_rows: Number of rows. """ return (idx, idx + 1) @@ -296,7 +296,7 @@ def is_causal(self) -> bool: """Get whether evaluator needs future data for its result.""" return False - def evaluate_all(self, values: pyarrow.Array, num_rows: int) -> pyarrow.Array: + def evaluate_all(self, values: list[pyarrow.Array], num_rows: int) -> pyarrow.Array: """Evaluate a window function on an entire input partition. This function is called once per input *partition* for window @@ -336,7 +336,9 @@ def evaluate_all(self, values: pyarrow.Array, num_rows: int) -> pyarrow.Array: """ pass - def evaluate(self, values: pyarrow.Array, range: tuple[int, int]) -> pyarrow.Scalar: + def evaluate( + self, values: list[pyarrow.Array], eval_range: tuple[int, int] + ) -> pyarrow.Scalar: """Evaluate window function on a range of rows in an input partition. This is the simplest and most general function to implement diff --git a/src/udwf.rs b/src/udwf.rs index 53a46569..cd3359b4 100644 --- a/src/udwf.rs +++ b/src/udwf.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::num; use std::ops::Range; use std::sync::Arc; @@ -89,12 +90,17 @@ impl PartitionEvaluator for RustPartitionEvaluator { fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result { Python::with_gil(|py| { - let mut py_args = values - .iter() - .map(|arg| arg.into_data().to_pyarrow(py).unwrap()) - .collect::>(); - py_args.push(num_rows.to_object(py)); - let py_args = PyTuple::new_bound(py, py_args); + let py_values = PyList::new_bound( + py, + values + .iter() + .map(|arg| arg.into_data().to_pyarrow(py).unwrap()), + ); + let py_num_rows = num_rows.to_object(py).into_bound(py); + let py_args = PyTuple::new_bound( + py, + PyTuple::new_bound(py, vec![py_values.as_any(), &py_num_rows]), + ); self.evaluator .bind(py) @@ -109,17 +115,19 @@ impl PartitionEvaluator for RustPartitionEvaluator { fn evaluate(&mut self, values: &[ArrayRef], range: &Range) -> Result { Python::with_gil(|py| { - // 1. cast args to Pyarrow array - let mut py_args = values - .iter() - .map(|arg| arg.into_data().to_pyarrow(py).unwrap()) - .collect::>(); + let py_values = PyList::new_bound( + py, + values + .iter() + .map(|arg| arg.into_data().to_pyarrow(py).unwrap()), + ); let range_tuple = PyTuple::new_bound(py, vec![range.start.to_object(py), range.end.to_object(py)]); - py_args.push(range_tuple.into()); - let py_args = PyTuple::new_bound(py, py_args); + let py_args = PyTuple::new_bound( + py, + PyTuple::new_bound(py, vec![py_values.as_any(), range_tuple.as_any()]), + ); - // 2. call function self.evaluator .bind(py) .call_method1("evaluate", py_args) From b8baad4e8fa86d26967991c2d66c7bd3127cdb5d Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 22 Sep 2024 13:25:09 -0400 Subject: [PATCH 06/11] Update udwf to take multiple input columns --- python/datafusion/tests/test_udwf.py | 49 +++++++++++++++-- python/datafusion/udf.py | 12 +++-- src/udwf.rs | 81 ++++++++++++++++++++++++---- 3 files changed, 125 insertions(+), 17 deletions(-) diff --git a/python/datafusion/tests/test_udwf.py b/python/datafusion/tests/test_udwf.py index c9d66305..aa158454 100644 --- a/python/datafusion/tests/test_udwf.py +++ b/python/datafusion/tests/test_udwf.py @@ -24,8 +24,6 @@ class ExponentialSmoothDefault(WindowEvaluator): - """Interface of a user-defined accumulation.""" - def __init__(self, alpha: float) -> None: self.alpha = alpha @@ -129,6 +127,39 @@ def evaluate( return pa.scalar(curr_value).cast(pa.float64()) +class SmoothTwoColumn(WindowEvaluator): + """This class demonstrates using two columns. + + If the second column is above a threshold, then smooth over the first column from + the previous and next rows. + """ + + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array: + results = [] + values_a = values[0] + values_b = values[1] + for idx in range(num_rows): + if values_b[idx].as_py() > 7: + if idx == 0: + results.append(values_a[1].cast(pa.float64())) + elif idx == num_rows - 1: + results.append(values_a[num_rows - 2].cast(pa.float64())) + else: + results.append( + pa.scalar( + values_a[idx - 1].as_py() * self.alpha + + values_a[idx + 1].as_py() * (1.0 - self.alpha) + ) + ) + else: + results.append(values_a[idx].cast(pa.float64())) + + return pa.array(results) + + class NotSubclassOfWindowEvaluator: pass @@ -187,6 +218,13 @@ def test_udwf_errors(df): volatility="immutable", ) +smooth_two_col = udwf( + SmoothTwoColumn(0.9), + [pa.int64(), pa.int64()], + pa.float64(), + volatility="immutable", +) + data_test_udwf_functions = [ ( "default_udwf", @@ -238,13 +276,18 @@ def test_udwf_errors(df): .build(), [0.551, 1.13, 2.3, 2.755, 3.876, 5.0, 5.513], ), + ( + "two_column_udwf", + smooth_two_col(column("a"), column("b")), + [0.0, 1.0, 2.0, 2.2, 3.2, 5.0, 6.0], + ), ] @pytest.mark.parametrize("name,expr,expected", data_test_udwf_functions) def test_udwf_functions(df, name, expr, expected): df = df.select("a", "b", f.round(expr, lit(3)).alias(name)) - df.sort(column("a")).show() + # execute and collect the first (and only) batch result = df.sort(column("a")).select(column(name)).collect()[0] diff --git a/python/datafusion/udf.py b/python/datafusion/udf.py index f6976ca5..589841be 100644 --- a/python/datafusion/udf.py +++ b/python/datafusion/udf.py @@ -415,7 +415,7 @@ def __init__( self, name: str | None, func: WindowEvaluator, - input_type: pyarrow.DataType, + input_types: list[pyarrow.DataType], return_type: pyarrow.DataType, volatility: Volatility | str, ) -> None: @@ -425,7 +425,7 @@ def __init__( descriptions. """ self._udwf = df_internal.WindowUDF( - name, func, input_type, return_type, str(volatility) + name, func, input_types, return_type, str(volatility) ) def __call__(self, *args: Expr) -> Expr: @@ -440,7 +440,7 @@ def __call__(self, *args: Expr) -> Expr: @staticmethod def udwf( func: WindowEvaluator, - input_type: pyarrow.DataType, + input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: pyarrow.DataType, volatility: Volatility | str, name: str | None = None, @@ -449,7 +449,7 @@ def udwf( Args: func: The python function. - input_type: The data type of the arguments to ``func``. + input_types: The data types of the arguments to ``func``. return_type: The data type of the return value. volatility: See :py:class:`Volatility` for allowed values. name: A descriptive name for the function. @@ -463,10 +463,12 @@ def udwf( ) if name is None: name = func.__class__.__qualname__.lower() + if isinstance(input_types, pyarrow.DataType): + input_types = [input_types] return WindowUDF( name=name, func=func, - input_type=input_type, + input_types=input_types, return_type=return_type, volatility=volatility, ) diff --git a/src/udwf.rs b/src/udwf.rs index cd3359b4..31cc5e60 100644 --- a/src/udwf.rs +++ b/src/udwf.rs @@ -15,13 +15,12 @@ // specific language governing permissions and limitations // under the License. -use std::num; +use std::any::Any; use std::ops::Range; use std::sync::Arc; use arrow::array::{make_array, Array, ArrayData, ArrayRef}; use datafusion::logical_expr::window_state::WindowAggState; -use datafusion::prelude::create_udwf; use datafusion::scalar::ScalarValue; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; @@ -29,7 +28,9 @@ use pyo3::prelude::*; use datafusion::arrow::datatypes::DataType; use datafusion::arrow::pyarrow::{FromPyArrow, PyArrowType, ToPyArrow}; use datafusion::error::{DataFusionError, Result}; -use datafusion::logical_expr::{PartitionEvaluator, PartitionEvaluatorFactory, WindowUDF}; +use datafusion::logical_expr::{ + PartitionEvaluator, PartitionEvaluatorFactory, Signature, Volatility, WindowUDF, WindowUDFImpl, +}; use pyo3::types::{PyList, PyTuple}; use crate::expr::PyExpr; @@ -211,21 +212,24 @@ pub struct PyWindowUDF { #[pymethods] impl PyWindowUDF { #[new] - #[pyo3(signature=(name, evaluator, input_type, return_type, volatility))] + #[pyo3(signature=(name, evaluator, input_types, return_type, volatility))] fn new( name: &str, evaluator: PyObject, - input_type: PyArrowType, + input_types: Vec>, return_type: PyArrowType, volatility: &str, ) -> PyResult { - let function = create_udwf( + let return_type = return_type.0; + let input_types = input_types.into_iter().map(|t| t.0).collect(); + + let function = WindowUDF::from(MultiColumnWindowUDF::new( name, - input_type.0, - Arc::new(return_type.0), + input_types, + return_type, parse_volatility(volatility)?, to_rust_partition_evaluator(evaluator), - ); + )); Ok(Self { function }) } @@ -240,3 +244,62 @@ impl PyWindowUDF { Ok(format!("WindowUDF({})", self.function.name())) } } + +pub struct MultiColumnWindowUDF { + name: String, + signature: Signature, + return_type: DataType, + partition_evaluator_factory: PartitionEvaluatorFactory, +} + +impl std::fmt::Debug for MultiColumnWindowUDF { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("WindowUDF") + .field("name", &self.name) + .field("signature", &self.signature) + .field("return_type", &"") + .field("partition_evaluator_factory", &"") + .finish() + } +} + +impl MultiColumnWindowUDF { + pub fn new( + name: impl Into, + input_types: Vec, + return_type: DataType, + volatility: Volatility, + partition_evaluator_factory: PartitionEvaluatorFactory, + ) -> Self { + let name = name.into(); + let signature = Signature::exact(input_types, volatility); + Self { + name, + signature, + return_type, + partition_evaluator_factory, + } + } +} + +impl WindowUDFImpl for MultiColumnWindowUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + &self.name + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(self.return_type.clone()) + } + + fn partition_evaluator(&self) -> Result> { + (self.partition_evaluator_factory)() + } +} From 326dacd3f6b46e203a00b6178c3c7e3429beec52 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 22 Sep 2024 13:25:45 -0400 Subject: [PATCH 07/11] Add user exampe for UDWF --- examples/python-udwf.py | 270 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 270 insertions(+) create mode 100644 examples/python-udwf.py diff --git a/examples/python-udwf.py b/examples/python-udwf.py new file mode 100644 index 00000000..a320047d --- /dev/null +++ b/examples/python-udwf.py @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyarrow as pa +import datafusion +from datafusion import udwf, functions as f, col, lit +from datafusion.udf import WindowEvaluator +from datafusion.expr import WindowFrame + +# This example creates five different examples of user defined window functions in order +# to demonstrate the variety of ways a user may need to implement. + + +class ExponentialSmoothDefault(WindowEvaluator): + """Create a running smooth operation across an entire partition at once.""" + + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array: + results = [] + curr_value = 0.0 + values = values[0] + for idx in range(num_rows): + if idx == 0: + curr_value = values[idx].as_py() + else: + curr_value = values[idx].as_py() * self.alpha + curr_value * ( + 1.0 - self.alpha + ) + results.append(curr_value) + + return pa.array(results) + + +class SmoothBoundedFromPreviousRow(WindowEvaluator): + """Smooth over from the previous to current row only.""" + + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def supports_bounded_execution(self) -> bool: + return True + + def get_range(self, idx: int, num_rows: int) -> tuple[int, int]: + # Ovrerride the default range of current row since uses_window_frame is False + # So for the purpose of this test we just smooth from the previous row to + # current. + if idx == 0: + return (0, 0) + return (idx - 1, idx) + + def evaluate( + self, values: list[pa.Array], eval_range: tuple[int, int] + ) -> pa.Scalar: + (start, stop) = eval_range + curr_value = 0.0 + values = values[0] + for idx in range(start, stop + 1): + if idx == start: + curr_value = values[idx].as_py() + else: + curr_value = values[idx].as_py() * self.alpha + curr_value * ( + 1.0 - self.alpha + ) + return pa.scalar(curr_value).cast(pa.float64()) + + +class SmoothAcrossRank(WindowEvaluator): + """Smooth over the rank from the previous rank to current.""" + + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def include_rank(self) -> bool: + return True + + def evaluate_all_with_rank( + self, num_rows: int, ranks_in_partition: list[tuple[int, int]] + ) -> pa.Array: + results = [] + for idx in range(num_rows): + if idx == 0: + prior_value = 1.0 + matching_row = [ + i + for i in range(len(ranks_in_partition)) + if ranks_in_partition[i][0] <= idx and ranks_in_partition[i][1] > idx + ][0] + 1 + curr_value = matching_row * self.alpha + prior_value * (1.0 - self.alpha) + results.append(curr_value) + prior_value = matching_row + + return pa.array(results) + + +class ExponentialSmoothFrame(WindowEvaluator): + "Find the value across an entire frame using exponential smoothing" + + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def uses_window_frame(self) -> bool: + return True + + def evaluate( + self, values: list[pa.Array], eval_range: tuple[int, int] + ) -> pa.Scalar: + (start, stop) = eval_range + curr_value = 0.0 + if len(values) > 1: + order_by = values[1] # noqa: F841 + values = values[0] + else: + values = values[0] + for idx in range(start, stop): + if idx == start: + curr_value = values[idx].as_py() + else: + curr_value = values[idx].as_py() * self.alpha + curr_value * ( + 1.0 - self.alpha + ) + return pa.scalar(curr_value).cast(pa.float64()) + + +class SmoothTwoColumn(WindowEvaluator): + """Smooth once column based on a condition of another column. + + If the second column is above a threshold, then smooth over the first column from + the previous and next rows. + """ + + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array: + results = [] + values_a = values[0] + values_b = values[1] + for idx in range(num_rows): + if not values_b[idx].is_valid: + if idx == 0: + results.append(values_a[1].cast(pa.float64())) + elif idx == num_rows - 1: + results.append(values_a[num_rows - 2].cast(pa.float64())) + else: + results.append( + pa.scalar( + values_a[idx - 1].as_py() * self.alpha + + values_a[idx + 1].as_py() * (1.0 - self.alpha) + ) + ) + else: + results.append(values_a[idx].cast(pa.float64())) + + return pa.array(results) + + +# create a context +ctx = datafusion.SessionContext() + +# create a RecordBatch and a new DataFrame from it +batch = pa.RecordBatch.from_arrays( + [ + pa.array([1.0, 2.1, 2.9, 4.0, 5.1, 6.0, 6.9, 8.0]), + pa.array([1, 2, None, 4, 5, 6, None, 8]), + pa.array(["A", "A", "A", "A", "A", "B", "B", "B"]), + ], + names=["a", "b", "c"], +) +df = ctx.create_dataframe([[batch]]) + +exp_smooth = udwf( + ExponentialSmoothDefault(0.9), + pa.float64(), + pa.float64(), + volatility="immutable", +) + +smooth_two_row = udwf( + SmoothBoundedFromPreviousRow(0.9), + pa.float64(), + pa.float64(), + volatility="immutable", +) + +smooth_rank = udwf( + SmoothAcrossRank(0.9), + pa.float64(), + pa.float64(), + volatility="immutable", +) + +smooth_frame = udwf( + ExponentialSmoothFrame(0.9), + pa.float64(), + pa.float64(), + volatility="immutable", +) + +smooth_two_col = udwf( + SmoothTwoColumn(0.9), + [pa.float64(), pa.int64()], + pa.float64(), + volatility="immutable", +) + +# These are done with separate statements instead of one large `select` because that will +# attempt to combine the window operations and our defined UDFs do not all support that. +( + df.with_column("exp_smooth", exp_smooth(col("a"))) + .with_column("smooth_prior_row", smooth_two_row(col("a"))) + .with_column("smooth_rank", smooth_rank(col("a")).order_by(col("c")).build()) + .with_column("smooth_two_col", smooth_two_col(col("a"), col("b"))) + .with_column( + "smooth_frame", + smooth_frame(col("a")).window_frame(WindowFrame("rows", None, 0)).build(), + ) + .select( + "a", + "b", + "c", + "exp_smooth", + "smooth_prior_row", + "smooth_rank", + "smooth_two_col", + "smooth_frame", + ) +).show() + +assert df.select(f.round(exp_smooth(col("a")), lit(3))).collect()[0].column( + 0 +) == pa.array([1, 1.99, 2.809, 3.881, 4.978, 5.898, 6.8, 7.88]) + + +assert df.select(f.round(smooth_two_row(col("a")), lit(3))).collect()[0].column( + 0 +) == pa.array([1.0, 1.99, 2.82, 3.89, 4.99, 5.91, 6.81, 7.89]) + + +assert df.select(smooth_rank(col("a")).order_by(col("c")).build()).collect()[0].column( + 0 +) == pa.array([1, 1, 1, 1, 1, 1.9, 2.0, 2.0]) + + +assert df.select(smooth_two_col(col("a"), col("b"))).collect()[0].column(0) == pa.array( + [1, 2.1, 2.29, 4, 5.1, 6, 6.2, 8.0] +) + + +assert df.select( + f.round( + smooth_frame(col("a")).window_frame(WindowFrame("rows", None, 0)).build(), + lit(3), + ) +).collect()[0].column(0) == pa.array([1, 1.99, 2.809, 3.881, 4.978, 5.898, 6.8, 7.88]) From d03ce13cce6324a1afebf7f4904de0c8e813e5ba Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 23 Sep 2024 08:32:25 -0400 Subject: [PATCH 08/11] Update template for how values are passed to update --- python/datafusion/udf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/datafusion/udf.py b/python/datafusion/udf.py index 589841be..9ec5f342 100644 --- a/python/datafusion/udf.py +++ b/python/datafusion/udf.py @@ -152,7 +152,7 @@ def state(self) -> List[pyarrow.Scalar]: pass @abstractmethod - def update(self, values: pyarrow.Array) -> None: + def update(self, *values: pyarrow.Array) -> None: """Evaluate an array of values and update state.""" pass From 3dfa3305d88296c60d3d09330a727713da5e801a Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 23 Sep 2024 08:32:43 -0400 Subject: [PATCH 09/11] Add user documentation for UDWF --- .../common-operations/udf-and-udfa.rst | 195 ++++++++++++++++-- 1 file changed, 178 insertions(+), 17 deletions(-) diff --git a/docs/source/user-guide/common-operations/udf-and-udfa.rst b/docs/source/user-guide/common-operations/udf-and-udfa.rst index e9c142f0..6f16b846 100644 --- a/docs/source/user-guide/common-operations/udf-and-udfa.rst +++ b/docs/source/user-guide/common-operations/udf-and-udfa.rst @@ -18,8 +18,21 @@ User Defined Functions ====================== -DataFusion provides powerful expressions and functions, reducing the need for custom Python functions. -However you can still incorporate your own functions, i.e. User-Defined Functions (UDFs), with the :py:func:`~datafusion.udf.ScalarUDF.udf` function. +DataFusion provides powerful expressions and functions, reducing the need for custom Python +functions. However you can still incorporate your own functions, i.e. User-Defined Functions (UDFs). + +Scalar Functions +---------------- + +When writing a user defined function that can operate on a row by row basis, these are called Scalar +Functions. You can define your own scalar function by calling +:py:func:`~datafusion.udf.ScalarUDF.udf` . + +The basic definition of a scalar UDF is a python function that takes one or more +`pyarrow `_ arrays and returns a single array as +output. DataFusion scalar UDFs operate on an entire batch of record at a time, though the evaluation +of those records should be on a row by row basis. In the following example, we compute if the input +array contains null values. .. ipython:: python @@ -35,14 +48,70 @@ However you can still incorporate your own functions, i.e. User-Defined Function ctx = datafusion.SessionContext() batch = pyarrow.RecordBatch.from_arrays( - [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])], + [pyarrow.array([1, None, 3]), pyarrow.array([4, 5, 6])], names=["a", "b"], ) df = ctx.create_dataframe([[batch]], name="batch_array") - df.select(is_null_arr(col("a"))).to_pandas() + df.select(col("a"), is_null_arr(col("a")).alias("is_null")).show() + +In the previous example, we used the fact that pyarrow provides a variety of built in array +functions such as ``is_null()``. There are additional pyarrow +`compute functions `_ available. When possible, +it is highly recommended to use these functions because they can perform computations without doing +any copy operations from the original arrays. This leads to greatly improved performance. + +If you need to perform an operation in python that is not available with the pyarrow compute +functions, you will need to convert the record batch into python values, perform your operation, +and construct an array. This operation of converting the built in data type of the array into a +python object can be one of the slowest operations in DataFusion, so it should be done sparingly. + +The following example performs the same operation as before with ``is_null`` but demonstrates +converting to Python objects to do the evaluation. + +.. ipython:: python + + import pyarrow + import datafusion + from datafusion import udf, col + + def is_null(array: pyarrow.Array) -> pyarrow.Array: + results = [] + for value in array: + results.append(value.as_py() == None) + return pyarrow.array(results) + + is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), 'stable') + + ctx = datafusion.SessionContext() + + batch = pyarrow.RecordBatch.from_arrays( + [pyarrow.array([1, None, 3]), pyarrow.array([4, 5, 6])], + names=["a", "b"], + ) + df = ctx.create_dataframe([[batch]], name="batch_array") -Additionally the :py:func:`~datafusion.udf.AggregateUDF.udaf` function allows you to define User-Defined Aggregate Functions (UDAFs) + df.select(col("a"), is_null_arr(col("a")).alias("is_null")).show() + +Aggregate Functions +------------------- + +The :py:func:`~datafusion.udf.AggregateUDF.udaf` function allows you to define User-Defined +Aggregate Functions (UDAFs). To use this you must implement an +:py:class:`~datafusion.udf.Accumulator` that determines how the aggregation is performed. + +When defining a UDAF there are four methods you need to implement. The ``update`` function takes the +array(s) of input and updates the internal state of the accumulator. You should define this function +to have as many input arguments as you will pass when calling the UDAF. Since aggregation may be +split into multiple batches, we must have a method to combine multiple batches. For this, we have +two functions, ``state`` and ``merge``. ``state`` will return an array of scalar values that contain +the current state of a single batch accumulation. Then we must ``merge`` the results of these +different states. Finally ``evaluate`` is the call that will return the final result after the +``merge`` is complete. + +In the following example we want to define a custom aggregate function that will return the +difference between the sum of two columns. The state can be represented by a single value and we can +also see how the inputs to ``update`` and ``merge`` differ. .. code-block:: python @@ -57,30 +126,122 @@ Additionally the :py:func:`~datafusion.udf.AggregateUDF.udaf` function allows yo Interface of a user-defined accumulation. """ def __init__(self): - self._sum = pyarrow.scalar(0.0) + self._sum = 0.0 - def update(self, values: pyarrow.Array) -> None: - # not nice since pyarrow scalars can't be summed yet. This breaks on `None` - self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py()) + def update(self, values_a: pyarrow.Array, values_b: pyarrow.Array) -> None: + self._sum = self._sum + pyarrow.compute.sum(values_a).as_py() - pyarrow.compute.sum(values_b).as_py() def merge(self, states: List[pyarrow.Array]) -> None: - # not nice since pyarrow scalars can't be summed yet. This breaks on `None` - self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states[0]).as_py()) + self._sum = self._sum + pyarrow.compute.sum(states[0]).as_py() def state(self) -> pyarrow.Array: - return pyarrow.array([self._sum.as_py()]) + return pyarrow.array([self._sum]) def evaluate(self) -> pyarrow.Scalar: - return self._sum + return pyarrow.scalar(self._sum) ctx = datafusion.SessionContext() df = ctx.from_pydict( { - "a": [1, 2, 3], - "b": [4, 5, 6], + "a": [4, 5, 6], + "b": [1, 2, 3], } ) - my_udaf = udaf(MyAccumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()], 'stable') + my_udaf = udaf(MyAccumulator, [pyarrow.float64(), pyarrow.float64()], pyarrow.float64(), [pyarrow.float64()], 'stable') + + df.aggregate([], [my_udaf(col("a"), col("b")).alias("col_diff")]) + +Window Functions +---------------- + +To implement a User-Defined Window Function (UDWF) you must call the +:py:func:`~datafusion.udf.WindowUDF.udwf` function using a class that implements the abstract +class :py:class:`~datafusion.udf.WindowEvaluator`. + +There are three methods of evaluation of UDWFs. + +- ``evaluate`` is the simplest case, where you are given an array and are expected to calculate the + value for a single row of that array. This is the simplest case, but also the least performant. +- ``evaluate_all`` computes the values for all rows for an input array at a single time. +- ``evaluate_all_with_rank`` computes the values for all rows, but you only have the rank + information for the rows. + +Which methods you implement are based upon which of these options are set. + +.. list-table:: Title + :header-rows: 1 + + * - ``uses_window_frame`` + - ``supports_bounded_execution`` + - ``include_rank`` + - function_to_implement + * - False (default) + - False (default) + - False (default) + - ``evaluate_all`` + * - False + - True + - False + - ``evaluate`` + * - False + - True + - False + - ``evaluate_all_with_rank`` + * - True + - True/False + - True/False + - ``evaluate`` + +UDWF options +^^^^^^^^^^^^ + +When you define your UDWF you can override the functions that return these values. They will +determine which evaluate functions are called. + +- ``uses_window_frame`` is set for functions that compute based on the specified window frame. If + your function depends upon the specified frame, set this to ``True``. +- ``supports_bounded_execution`` specifies if your function can be incrementally computed. +- ``include_rank`` is set to ``True`` for window functions that can be computed only using the rank + information. + + +.. code-block:: python + + import pyarrow as pa + from datafusion import udwf, col, SessionContext + from datafusion.udf import WindowEvaluator + + class ExponentialSmooth(WindowEvaluator): + def __init__(self, alpha: float) -> None: + self.alpha = alpha + + def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array: + results = [] + curr_value = 0.0 + values = values[0] + for idx in range(num_rows): + if idx == 0: + curr_value = values[idx].as_py() + else: + curr_value = values[idx].as_py() * self.alpha + curr_value * ( + 1.0 - self.alpha + ) + results.append(curr_value) + + return pa.array(results) + + exp_smooth = udwf( + ExponentialSmooth(0.9), + pa.float64(), + pa.float64(), + volatility="immutable", + ) + + ctx = SessionContext() + + df = ctx.from_pydict({ + "a": [1.0, 2.1, 2.9, 4.0, 5.1, 6.0, 6.9, 8.0] + }) - df.aggregate([],[my_udaf(col("a"))]) + df.select("a", exp_smooth(col("a")).alias("smooth_a")).show() From 42092f76f8e663fe8adac09d7b90da2dcb0a9a31 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 29 Sep 2024 07:59:40 -0400 Subject: [PATCH 10/11] Updating documentation per PR review --- .../common-operations/udf-and-udfa.rst | 17 +-- examples/python-udwf.py | 2 +- python/datafusion/tests/test_udwf.py | 2 +- python/datafusion/udf.py | 124 ++++++++++-------- 4 files changed, 75 insertions(+), 70 deletions(-) diff --git a/docs/source/user-guide/common-operations/udf-and-udfa.rst b/docs/source/user-guide/common-operations/udf-and-udfa.rst index 6f16b846..ffd7a05c 100644 --- a/docs/source/user-guide/common-operations/udf-and-udfa.rst +++ b/docs/source/user-guide/common-operations/udf-and-udfa.rst @@ -15,7 +15,7 @@ .. specific language governing permissions and limitations .. under the License. -User Defined Functions +User-Defined Functions ====================== DataFusion provides powerful expressions and functions, reducing the need for custom Python @@ -24,15 +24,15 @@ functions. However you can still incorporate your own functions, i.e. User-Defin Scalar Functions ---------------- -When writing a user defined function that can operate on a row by row basis, these are called Scalar +When writing a user-defined function that can operate on a row by row basis, these are called Scalar Functions. You can define your own scalar function by calling :py:func:`~datafusion.udf.ScalarUDF.udf` . The basic definition of a scalar UDF is a python function that takes one or more `pyarrow `_ arrays and returns a single array as -output. DataFusion scalar UDFs operate on an entire batch of record at a time, though the evaluation -of those records should be on a row by row basis. In the following example, we compute if the input -array contains null values. +output. DataFusion scalar UDFs operate on an entire batch of records at a time, though the +evaluation of those records should be on a row by row basis. In the following example, we compute +if the input array contains null values. .. ipython:: python @@ -76,10 +76,7 @@ converting to Python objects to do the evaluation. from datafusion import udf, col def is_null(array: pyarrow.Array) -> pyarrow.Array: - results = [] - for value in array: - results.append(value.as_py() == None) - return pyarrow.array(results) + return pyarrow.array([value.as_py() is None for value in array]) is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), 'stable') @@ -169,7 +166,7 @@ There are three methods of evaluation of UDWFs. Which methods you implement are based upon which of these options are set. -.. list-table:: Title +.. list-table:: :header-rows: 1 * - ``uses_window_frame`` diff --git a/examples/python-udwf.py b/examples/python-udwf.py index a320047d..05b3021d 100644 --- a/examples/python-udwf.py +++ b/examples/python-udwf.py @@ -57,7 +57,7 @@ def supports_bounded_execution(self) -> bool: return True def get_range(self, idx: int, num_rows: int) -> tuple[int, int]: - # Ovrerride the default range of current row since uses_window_frame is False + # Override the default range of current row since uses_window_frame is False # So for the purpose of this test we just smooth from the previous row to # current. if idx == 0: diff --git a/python/datafusion/tests/test_udwf.py b/python/datafusion/tests/test_udwf.py index aa158454..67c0979f 100644 --- a/python/datafusion/tests/test_udwf.py +++ b/python/datafusion/tests/test_udwf.py @@ -51,7 +51,7 @@ def supports_bounded_execution(self) -> bool: return True def get_range(self, idx: int, num_rows: int) -> tuple[int, int]: - # Ovrerride the default range of current row since uses_window_frame is False + # Override the default range of current row since uses_window_frame is False # So for the purpose of this test we just smooth from the previous row to # current. if idx == 0: diff --git a/python/datafusion/udf.py b/python/datafusion/udf.py index 9ec5f342..4cd7b75b 100644 --- a/python/datafusion/udf.py +++ b/python/datafusion/udf.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -"""Provides the user defined functions for evaluation of dataframes.""" +"""Provides the user-defined functions for evaluation of dataframes.""" from __future__ import annotations @@ -76,7 +76,7 @@ def __str__(self): class ScalarUDF: - """Class for performing scalar user defined functions (UDF). + """Class for performing scalar user-defined functions (UDF). Scalar UDFs operate on a row by row basis. See also :py:class:`AggregateUDF` for operating on a group of rows. @@ -90,7 +90,7 @@ def __init__( return_type: _R, volatility: Volatility | str, ) -> None: - """Instantiate a scalar user defined function (UDF). + """Instantiate a scalar user-defined function (UDF). See helper method :py:func:`udf` for argument details. """ @@ -115,7 +115,7 @@ def udf( volatility: Volatility | str, name: str | None = None, ) -> ScalarUDF: - """Create a new User Defined Function. + """Create a new User-Defined Function. Args: func: A callable python function. @@ -127,7 +127,7 @@ def udf( name: A descriptive name for the function. Returns: - A user defined aggregate function, which can be used in either data + A user-defined aggregate function, which can be used in either data aggregation or window function calls. """ if not callable(func): @@ -172,7 +172,7 @@ def evaluate(self) -> pyarrow.Scalar: class AggregateUDF: - """Class for performing scalar user defined functions (UDF). + """Class for performing scalar user-defined functions (UDF). Aggregate UDFs operate on a group of rows and return a single value. See also :py:class:`ScalarUDF` for operating on a row by row basis. @@ -187,7 +187,7 @@ def __init__( state_type: list[pyarrow.DataType], volatility: Volatility | str, ) -> None: - """Instantiate a user defined aggregate function (UDAF). + """Instantiate a user-defined aggregate function (UDAF). See :py:func:`udaf` for a convenience function and argument descriptions. @@ -214,7 +214,7 @@ def udaf( volatility: Volatility | str, name: str | None = None, ) -> AggregateUDF: - """Create a new User Defined Aggregate Function. + """Create a new User-Defined Aggregate Function. The accumulator function must be callable and implement :py:class:`Accumulator`. @@ -227,7 +227,7 @@ def udaf( name: A descriptive name for the function. Returns: - A user defined aggregate function, which can be used in either data + A user-defined aggregate function, which can be used in either data aggregation or window function calls. """ if not issubclass(accum, Accumulator): @@ -249,16 +249,21 @@ def udaf( class WindowEvaluator(metaclass=ABCMeta): - """Evaluator class for user defined window functions (UDWF). + """Evaluator class for user-defined window functions (UDWF). It is up to the user to decide which evaluate function is appropriate. - |``uses_window_frame``|``supports_bounded_execution``|``include_rank``|function_to_implement| - |---|---|----|----| - |False (default) |False (default) |False (default) | ``evaluate_all`` | - |False |True |False | ``evaluate`` | - |False |True/False |True | ``evaluate_all_with_rank`` | - |True |True/False |True/False | ``evaluate`` | + +------------------------+--------------------------------+------------------+---------------------------+ + | ``uses_window_frame`` | ``supports_bounded_execution`` | ``include_rank`` | function_to_implement | + +========================+================================+==================+===========================+ + | False (default) | False (default) | False (default) | ``evaluate_all`` | + +------------------------+--------------------------------+------------------+---------------------------+ + | False | True | False | ``evaluate`` | + +------------------------+--------------------------------+------------------+---------------------------+ + | False | True/False | True | ``evaluate_all_with_rank``| + +------------------------+--------------------------------+------------------+---------------------------+ + | True | True/False | True/False | ``evaluate`` | + +------------------------+--------------------------------+------------------+---------------------------+ """ # noqa: W505 def memoize(self) -> None: @@ -299,41 +304,43 @@ def is_causal(self) -> bool: def evaluate_all(self, values: list[pyarrow.Array], num_rows: int) -> pyarrow.Array: """Evaluate a window function on an entire input partition. - This function is called once per input *partition* for window - functions that *do not use* values from the window frame, - such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`, `PERCENT_RANK`, - `CUME_DIST`, `LEAD`, `LAG`). + This function is called once per input *partition* for window functions that + *do not use* values from the window frame, such as + :py:func:`~datafusion.functions.row_number`, :py:func:`~datafusion.functions.rank`, + :py:func:`~datafusion.functions.dense_rank`, :py:func:`~datafusion.functions.percent_rank`, + :py:func:`~datafusion.functions.cume_dist`, :py:func:`~datafusion.functions.lead`, + and :py:func:`~datafusion.functions.lag`. It produces the result of all rows in a single pass. It - expects to receive the entire partition as the `value` and + expects to receive the entire partition as the ``value`` and must produce an output column with one output row for every input row. - `num_rows` is required to correctly compute the output in case - `values.len() == 0` + ``num_rows`` is required to correctly compute the output in case + ``len(values) == 0`` - Implementing this function is an optimization: certain window + Implementing this function is an optimization. Certain window functions are not affected by the window frame definition or - the query doesn't have a frame, and `evaluate` skips the + the query doesn't have a frame, and ``evaluate`` skips the (costly) window frame boundary calculation and the overhead of - calling `evaluate` for each output row. + calling ``evaluate`` for each output row. For example, the `LAG` built in window function does not use the values of its window frame (it can be computed in one shot - on the entire partition with `Self::evaluate_all` regardless of the - window defined in the `OVER` clause) + on the entire partition with ``Self::evaluate_all`` regardless of the + window defined in the ``OVER`` clause) - ```sql - lag(x, 1) OVER (ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) - ``` + .. code-block:: text - However, `avg()` computes the average in the window and thus - does use its window frame + lag(x, 1) OVER (ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) - ```sql - avg(x) OVER (PARTITION BY y ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) - ``` - """ + However, ``avg()`` computes the average in the window and thus + does use its window frame. + + .. code-block:: text + + avg(x) OVER (PARTITION BY y ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) + """ # noqa: W505 pass def evaluate( @@ -361,27 +368,28 @@ def evaluate_all_with_rank( """Called for window functions that only need the rank of a row. Evaluate the partition evaluator against the partition using - the row ranks. For example, `RANK(col)` produces - - ```text - col | rank - --- + ---- - A | 1 - A | 1 - C | 3 - D | 4 - D | 5 - ``` + the row ranks. For example, ``rank(col("a"))`` produces + + .. code-block:: text + + a | rank + - + ---- + A | 1 + A | 1 + C | 3 + D | 4 + D | 4 For this case, `num_rows` would be `5` and the `ranks_in_partition` would be called with - ```text - [ - (0,1), - (2,2), - (3,4), - ] + .. code-block:: text + + [ + (0,1), + (2,2), + (3,4), + ] The user must implement this method if ``include_rank`` returns True. """ @@ -405,7 +413,7 @@ def include_rank(self) -> bool: class WindowUDF: - """Class for performing window user defined functions (UDF). + """Class for performing window user-defined functions (UDF). Window UDFs operate on a partition of rows. See also :py:class:`ScalarUDF` for operating on a row by row basis. @@ -419,7 +427,7 @@ def __init__( return_type: pyarrow.DataType, volatility: Volatility | str, ) -> None: - """Instantiate a user defined window function (UDWF). + """Instantiate a user-defined window function (UDWF). See :py:func:`udwf` for a convenience function and argument descriptions. @@ -445,7 +453,7 @@ def udwf( volatility: Volatility | str, name: str | None = None, ) -> WindowUDF: - """Create a new User Defined Window Function. + """Create a new User-Defined Window Function. Args: func: The python function. @@ -455,7 +463,7 @@ def udwf( name: A descriptive name for the function. Returns: - A user defined window function. + A user-defined window function. """ if not isinstance(func, WindowEvaluator): raise TypeError( From 0fc0b7432aece6c4579bcc667715268aa1510e55 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 29 Sep 2024 08:07:57 -0400 Subject: [PATCH 11/11] Small documentation update --- python/datafusion/udf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/datafusion/udf.py b/python/datafusion/udf.py index 4cd7b75b..bb7a9086 100644 --- a/python/datafusion/udf.py +++ b/python/datafusion/udf.py @@ -270,7 +270,7 @@ def memoize(self) -> None: """Perform a memoize operation to improve performance. When the window frame has a fixed beginning (e.g UNBOUNDED - PRECEDING), some functions such as FIRST_VALUE, LAST_VALUE and + PRECEDING), some functions such as FIRST_VALUE and NTH_VALUE do not need the (unbounded) input once they have seen a certain amount of input.