Skip to content

Commit

Permalink
fix(api): allow scalar window order keys
Browse files Browse the repository at this point in the history
previously the builder API disallowed passing scalar order keys like ibis.NA and ibis.random()
  • Loading branch information
kszucs authored and cpcloud committed Jul 20, 2023
1 parent 3997451 commit 3d3f4f3
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 26 deletions.
28 changes: 17 additions & 11 deletions ibis/backends/dask/execution/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,19 +219,25 @@ def compute_sort_key(
`execute` the expression and sort by the new derived column.
"""
name = ibis.util.guid()
if key.name in data:
return name, data[key.name]
if isinstance(key, str):
return key, None
if key.output_shape.is_columnar():
if key.name in data:
return name, data[key.name]
if isinstance(key, str):
return key, None
else:
if scope is None:
scope = Scope()
scope = scope.merge_scopes(
Scope({t: data}, timecontext)
for t in an.find_immediate_parent_tables(key)
)
new_column = execute(key, scope=scope, **kwargs)
new_column.name = name
return name, new_column
else:
if scope is None:
scope = Scope()
scope = scope.merge_scopes(
Scope({t: data}, timecontext) for t in an.find_immediate_parent_tables(key)
raise NotImplementedError(
"Scalar sort keys are not yet supported in the dask backend"
)
new_column = execute(key, scope=scope, **kwargs)
new_column.name = name
return name, new_column


def compute_sorted_frame(
Expand Down
28 changes: 16 additions & 12 deletions ibis/backends/pandas/execution/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,24 @@ def get_join_suffix_for_op(op: ops.TableColumn, join_op: ops.Join):


def compute_sort_key(key, data, timecontext, scope=None, **kwargs):
if isinstance(key, str):
return key, None
elif key.name in data:
return key.name, None
if key.output_shape.is_columnar():
if key.name in data:
return key.name, None
else:
if scope is None:
scope = Scope()
scope = scope.merge_scopes(
Scope({t: data}, timecontext)
for t in an.find_immediate_parent_tables(key)
)
new_column = execute(key, scope=scope, **kwargs)
name = ibis.util.guid()
new_column.name = name
return name, new_column
else:
if scope is None:
scope = Scope()
scope = scope.merge_scopes(
Scope({t: data}, timecontext) for t in an.find_immediate_parent_tables(key)
raise NotImplementedError(
"Scalar sort keys are not yet supported in the pandas backend"
)
new_column = execute(key, scope=scope, **kwargs)
name = ibis.util.guid()
new_column.name = name
return name, new_column


def compute_sorted_frame(df, order_by, group_by=(), timecontext=None, **kwargs):
Expand Down
10 changes: 10 additions & 0 deletions ibis/backends/tests/test_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,16 @@ def test_simple_ungrouped_unbound_following_window(
backend.assert_series_equal(result, expected)


@pytest.mark.broken(["pandas", "dask"], raises=NotImplementedError)
@pytest.mark.notimpl(["datafusion", "polars"], raises=com.OperationNotDefinedError)
def test_simple_ungrouped_window_with_scalar_order_by(backend, alltypes):
t = alltypes[alltypes.double_col < 50].order_by('id')
w = ibis.window(rows=(0, None), order_by=ibis.NA)
expr = t.double_col.sum().over(w).name('double_col')
# hard to reproduce this in pandas, so just test that it actually executes
expr.execute()


@pytest.mark.parametrize(
("result_fn", "expected_fn", "ordered"),
[
Expand Down
8 changes: 6 additions & 2 deletions ibis/expr/builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,16 @@ class WindowBuilder(Builder):

how = rlz.optional(rlz.isin({'rows', 'range'}), default="rows")
start = end = rlz.optional(rlz.option(rlz.range_window_boundary))
groupings = orderings = rlz.optional(
groupings = rlz.optional(
rlz.tuple_of(
rlz.one_of([rlz.column(rlz.any), rlz.instance_of((str, Deferred))])
rlz.one_of([rlz.instance_of((str, Deferred)), rlz.column(rlz.any)])
),
default=(),
)
orderings = rlz.optional(
rlz.tuple_of(rlz.one_of([rlz.instance_of((str, Deferred)), rlz.any])),
default=(),
)
max_lookback = rlz.optional(rlz.interval)

def _maybe_cast_boundary(self, boundary, dtype):
Expand Down
2 changes: 1 addition & 1 deletion ibis/expr/operations/sortkeys.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class SortKey(Value):
ascending = rlz.optional(rlz.bool_, default=True)

output_dtype = rlz.dtype_like("expr")
output_shape = rlz.Shape.COLUMNAR
output_shape = rlz.shape_like("expr")

@property
def name(self) -> str:
Expand Down
11 changes: 11 additions & 0 deletions ibis/tests/expr/test_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,14 @@ def test_expression_class_aliases():
assert ir.AnyValue is ir.Value
assert ir.AnyScalar is ir.Scalar
assert ir.AnyColumn is ir.Column


def test_sortkey_propagates_dtype_and_shape():
k = ops.SortKey(ibis.literal(1), ascending=True)
assert k.output_dtype == dt.int8
assert k.output_shape == rlz.Shape.SCALAR

t = ibis.table([('a', 'int16')], name='t')
k = ops.SortKey(t.a, ascending=True)
assert k.output_dtype == dt.int16
assert k.output_shape == rlz.Shape.COLUMNAR
22 changes: 22 additions & 0 deletions ibis/tests/expr/test_window_frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,28 @@ def test_window_api_supports_value_expressions(alltypes):
)


def test_window_api_supports_scalar_order_by(alltypes):
t = alltypes

w = ibis.window(order_by=ibis.NA)
assert w.bind(t) == ops.RowsWindowFrame(
table=t,
start=None,
end=None,
group_by=(),
order_by=(ibis.NA.op(),),
)

w = ibis.window(order_by=ibis.random())
assert w.bind(t) == ops.RowsWindowFrame(
table=t,
start=None,
end=None,
group_by=(),
order_by=(ibis.random().op(),),
)


def test_window_api_properly_determines_how():
assert ibis.window(between=(None, 5)).how == 'rows'
assert ibis.window(between=(1, 3)).how == 'rows'
Expand Down

0 comments on commit 3d3f4f3

Please sign in to comment.