Skip to content

Commit

Permalink
Kinda use map_partitions in indexed sjoin
Browse files Browse the repository at this point in the history
xref geopandas#114 (comment). This is an alternative way of writing the sjoin for the case where both sides have spatial partitions, using just high-level DataFrame APIs instead of generating the low-level dask.

There isn't a ton of advantage to this, because selecting the partitions generates a low-level graph, so you lose Blockwise fusion regardless.
  • Loading branch information
gjoseph92 committed Jan 19, 2022
1 parent 1073e40 commit 379e3d3
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 26 deletions.
70 changes: 47 additions & 23 deletions dask_geopandas/sjoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
from .core import from_geopandas, GeoDataFrame


def partitions_are_unchanged(part_idxs: np.ndarray, npartitions: int) -> bool:
"Whether selecting these partition indices would result in an identical DataFrame."
return len(part_idxs) == npartitions and (part_idxs[:-1] < part_idxs[1:]).all()


def sjoin(left, right, how="inner", op="intersects"):
"""
Spatial join of two GeoDataFrames.
Expand Down Expand Up @@ -58,33 +63,52 @@ def sjoin(left, right, how="inner", op="intersects"):
how="inner",
op="intersects",
)
parts_left = np.asarray(parts.index)
parts_right = np.asarray(parts["index_right"].values)
using_spatial_partitions = True
else:
# Unknown spatial partitions -> full cartesian (cross) product of all
# combinations of the partitions of the left and right dataframe
n_left = left.npartitions
n_right = right.npartitions
parts_left = np.repeat(np.arange(n_left), n_right)
parts_right = np.tile(np.arange(n_right), n_left)
using_spatial_partitions = False
parts_left = parts.index.values
parts_right = parts["index_right"].values
# Sub-select just the partitions from each input we need---unless we need all of them.
left_sub = (
left
if partitions_are_unchanged(parts_left, left.npartitions)
else left.partitions[parts_left]
)
right_sub = (
right
if partitions_are_unchanged(parts_right, right.npartitions)
else right.partitions[parts_right]
)

joined = left_sub.map_partitions(
geopandas.sjoin,
right_sub,
how,
op,
enforce_metadata=False,
transform_divisions=False,
align_dataframes=False,
meta=meta,
)

# TODO preserve spatial partitions of the output if only left has spatial
# partitions
joined.spatial_partitions = [
left.spatial_partitions.iloc[l].intersection(
right.spatial_partitions.iloc[r]
)
for l, r in zip(parts_left, parts_right)
]
return joined

# Unknown spatial partitions -> full cartesian (cross) product of all
# combinations of the partitions of the left and right dataframe
n_left = left.npartitions
n_right = right.npartitions
parts_left = np.repeat(np.arange(n_left), n_right)
parts_right = np.tile(np.arange(n_right), n_left)

dsk = {}
new_spatial_partitions = []
for i, (l, r) in enumerate(zip(parts_left, parts_right)):
dsk[(name, i)] = (geopandas.sjoin, (left._name, l), (right._name, r), how, op)
# TODO preserve spatial partitions of the output if only left has spatial
# partitions
if using_spatial_partitions:
lr = left.spatial_partitions.iloc[l]
rr = right.spatial_partitions.iloc[r]
# extent = lr.intersection(rr).buffer(buffer).intersection(lr.union(rr))
extent = lr.intersection(rr)
new_spatial_partitions.append(extent)

divisions = [None] * (len(dsk) + 1)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[left, right])
if not using_spatial_partitions:
new_spatial_partitions = None
return GeoDataFrame(graph, name, meta, divisions, new_spatial_partitions)
return GeoDataFrame(graph, name, meta, divisions, None)
13 changes: 10 additions & 3 deletions tests/test_sjoin.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import pytest

import geopandas
from geopandas.testing import assert_geodataframe_equal

import dask_geopandas


def test_sjoin_dask_geopandas():
@pytest.mark.parametrize(
"npartitions_left, npartitions_right", [(4, 4), (1, 3), (3, 1), (3, 4)]
)
def test_sjoin_dask_geopandas(npartitions_left, npartitions_right):
df_points = geopandas.read_file(geopandas.datasets.get_path("naturalearth_cities"))
ddf_points = dask_geopandas.from_geopandas(df_points, npartitions=4)
ddf_points = dask_geopandas.from_geopandas(df_points, npartitions=npartitions_left)

df_polygons = geopandas.read_file(
geopandas.datasets.get_path("naturalearth_lowres")
)
ddf_polygons = dask_geopandas.from_geopandas(df_polygons, npartitions=4)
ddf_polygons = dask_geopandas.from_geopandas(
df_polygons, npartitions=npartitions_right
)

expected = geopandas.sjoin(df_points, df_polygons, op="within", how="inner")
expected = expected.sort_index()
Expand Down

0 comments on commit 379e3d3

Please sign in to comment.