Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize rasterization of categorical NdOverlays #6206

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 65 additions & 34 deletions holoviews/operation/datashader.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,14 @@ def get_agg_data(cls, obj, category=None):
kdims = list(obj.kdims)
vdims = list(obj.vdims)
dims = obj.dimensions()[:2]
per_layer_cat = False
if isinstance(obj, Path):
glyph = 'line'
for p in obj.split(datatype='dataframe'):
paths.append(p)
elif isinstance(obj, CompositeOverlay):
element = None
per_layer_cat = category and [category] == obj.kdims
for key, el in obj.data.items():
x, y, element, glyph = cls.get_agg_data(el)
dims = (x, y)
Expand All @@ -299,44 +301,58 @@ def get_agg_data(cls, obj, category=None):
else:
x, y = dims


if len(paths) > 1:
if glyph == 'line':
if glyph == 'line' and not per_layer_cat:
path = paths[0][:1]
if isinstance(path, dd.DataFrame):
path = path.compute()
empty = path.copy()
empty.iloc[0, :] = (np.nan,) * empty.shape[1]
paths = [elem for p in paths for elem in (p, empty)][:-1]
if all(isinstance(path, dd.DataFrame) for path in paths):
if per_layer_cat:
df = paths
elif all(isinstance(path, dd.DataFrame) for path in paths):
df = dd.concat(paths)
else:
paths = [p.compute() if isinstance(p, dd.DataFrame) else p for p in paths]
df = pd.concat(paths)
else:
df = paths[0] if paths else pd.DataFrame([], columns=[x.name, y.name])
if category and df[category].dtype.name != 'category':
df[category] = df[category].astype('category')

is_custom = isinstance(df, dd.DataFrame) or cuDFInterface.applies(df)
if any((not is_custom and len(df[d.name]) and isinstance(df[d.name].values[0], cftime_types)) or
df[d.name].dtype.kind in ["M", "u"] for d in (x, y)):
df = df.copy()

for d in (x, y):
vals = df[d.name]
if not is_custom and len(vals) and isinstance(vals.values[0], cftime_types):
vals = cftime_to_timestamp(vals, 'ns')
elif vals.dtype.kind == 'M':
vals = vals.astype('datetime64[ns]')
elif vals.dtype == np.uint64:
raise TypeError(f"Dtype of uint64 for column {d.name} is not supported.")
elif vals.dtype.kind == 'u':
pass # To convert to int64
else:
continue
df[d.name] = cast_array_to_int64(vals)
return x, y, Dataset(df, kdims=kdims, vdims=vdims), glyph
dfs = df if isinstance(df, list) else [df]
converted = []
for sdf in dfs:
if category and sdf[category].dtype.name != 'category':
sdf[category] = sdf[category].astype('category')

is_custom = isinstance(sdf, dd.DataFrame) or cuDFInterface.applies(sdf)
if any((not is_custom and len(sdf[d.name]) and isinstance(sdf[d.name].values[0], cftime_types)) or
sdf[d.name].dtype.kind in ["M", "u"] for d in (x, y)):
sdf = sdf.copy()

for d in (x, y):
vals = sdf[d.name]
if not is_custom and len(vals) and isinstance(vals.values[0], cftime_types):
vals = cftime_to_timestamp(vals, 'ns')
elif vals.dtype.kind == 'M':
vals = vals.astype('datetime64[ns]')
elif vals.dtype == np.uint64:
raise TypeError(f"Dtype of uint64 for column {d.name} is not supported.")
elif vals.dtype.kind == 'u':
pass # To convert to int64
else:
continue
sdf[d.name] = cast_array_to_int64(vals)
converted.append(sdf)

kwargs = {}
if len(converted) == 1:
df = converted[0]
else:
df = converted
kwargs['datatype'] = ['multitabular']
return x, y, Dataset(df, kdims=kdims, vdims=vdims, **kwargs), glyph

def _process(self, element, key=None):
agg_fn = self._get_aggregator(element, self.p.aggregator)
Expand Down Expand Up @@ -380,34 +396,49 @@ def _process(self, element, key=None):
if self.p.line_width and glyph == 'line' and ds_version >= Version('0.14.0'):
agg_kwargs['line_width'] = self.p.line_width

dfdata = PandasInterface.as_dframe(data)
cvs_fn = getattr(cvs, glyph)
if data.interface.multi:
aggs = []
for sdf in data.data:
dfdata = PandasInterface.as_dframe(data.clone(sdf, datatype=Dataset.datatype))
agg = self._compute_aggregate(
dfdata, cvs_fn, x, y, xtype, ytype, agg_fn, sel_fn, params, agg_kwargs
)
aggs.append(agg)
agg = xr.concat(aggs, agg_fn.cat_column)
else:
dfdata = PandasInterface.as_dframe(data)
agg = self._compute_aggregate(
dfdata, cvs_fn, x, y, xtype, ytype, agg_fn, sel_fn, params, agg_kwargs
)

if isinstance(agg, xr.Dataset) or agg.ndim == 2:
# Replacing x and y coordinates to avoid numerical precision issues
eldata = agg if ds_version > Version('0.5.0') else (xs, ys, agg.data)
return self.p.element_type(eldata, **params)
else:
params['vdims'] = list(map(str, agg.coords[agg_fn.column].data))
return ImageStack(agg, **params)

def _compute_aggregate(self, data, aggregator, x, y, xtype, ytype, agg_fn, sel_fn, params, agg_kwargs):
if sel_fn:
if isinstance(params["vdims"], (Dimension, str)):
params["vdims"] = [params["vdims"]]
sum_agg = ds.summary(**{str(params["vdims"][0]): agg_fn, "index": ds.where(sel_fn)})
agg = self._apply_datashader(dfdata, cvs_fn, sum_agg, agg_kwargs, x, y)
agg = self._apply_datashader(data, aggregator, sum_agg, agg_kwargs, x, y)
_ignore = [*params["vdims"], "index"]
sel_vdims = [s for s in agg if s not in _ignore]
params["vdims"] = [*params["vdims"], *sel_vdims]
else:
agg = self._apply_datashader(dfdata, cvs_fn, agg_fn, agg_kwargs, x, y)
agg = self._apply_datashader(data, aggregator, agg_fn, agg_kwargs, x, y)

if 'x_axis' in agg.coords and 'y_axis' in agg.coords:
agg = agg.rename({'x_axis': x, 'y_axis': y})
if xtype == 'datetime':
agg[x.name] = agg[x.name].astype('datetime64[ns]')
if ytype == 'datetime':
agg[y.name] = agg[y.name].astype('datetime64[ns]')

if isinstance(agg, xr.Dataset) or agg.ndim == 2:
# Replacing x and y coordinates to avoid numerical precision issues
eldata = agg if ds_version > Version('0.5.0') else (xs, ys, agg.data)
return self.p.element_type(eldata, **params)
else:
params['vdims'] = list(map(str, agg.coords[agg_fn.column].data))
return ImageStack(agg, **params)
return agg

def _apply_datashader(self, dfdata, cvs_fn, agg_fn, agg_kwargs, x, y):
# Suppress numpy warning emitted by dask:
Expand Down
Loading