diff --git a/gpm/bucket/partitioning.py b/gpm/bucket/partitioning.py index dc0503a4..b1c7570a 100644 --- a/gpm/bucket/partitioning.py +++ b/gpm/bucket/partitioning.py @@ -1,26 +1,29 @@ from functools import wraps + import numpy as np -import pandas as pd +import pandas as pd import polars as pl + from gpm.utils.geospatial import ( - _check_size, - check_extent, Extent, - get_geographic_extent_around_point, - get_country_extent, + _check_size, + check_extent, get_continent_extent, + get_country_extent, get_extent_around_point, + get_geographic_extent_around_point, ) + ####--------------------------------------------------------. #### Grids ## Unstructured -# - Planar +# - Planar # - Spherical -# --> Regular Geometry -# --> Irregular Geometry +# --> Regular Geometry +# --> Irregular Geometry ## Structured -# 2D on the sphere +# 2D on the sphere # - SwathDefinition # - CRS: geographic @@ -31,21 +34,21 @@ ####--------------------------------------------------------. #### Partitioning # 2D Partitioning: -# --> XYPartitioning: size +# --> XYPartitioning: size # --> GeographicPartitioning # --> TilePartitioning # - Last partition not granted to have same size as the other -# - If size = resolution, share same property as an AreaDefinition +# - If size = resolution, share same property as an AreaDefinition # - Directory: 1 level (TilePartitioning) or 2 levels (XYPartitioning/GeographicPartitioning) # - CRS: geographic / projected -# - Query by extent: OK +# - Query by extent: OK # - Query by geometry: First subset by extent, then intersect quadmesh? # KdTree/RTree Partitioning # --> https://rtree.readthedocs.io/en/latest/ # SphericalPartitioning -# --> H3Partitioning (Uber’s Hexagonal Hierarchical) +# --> H3Partitioning (Uber’s Hexagonal Hierarchical) # --> S2Partitioning (Google Geometry) (Hierarchical, discrete, global grid system) (Square cells) # --> HealpixPartitioning # --> GaussianGrid, CubedSphere, ... (other hierarchical, discrete, and global grid system) @@ -53,32 +56,32 @@ # --> Geohash: a system for encoding locations using a string of characters, creating a hierarchical, square grid system (a quadtree). # - Directory: 1 level -# - Query by extent: Define polygon and query by geometry ??? +# - Query by extent: Define polygon and query by geometry ??? # - Query by geometry: First subset by extent, then intersect quadmesh? # GeometryPartitioning -# - Can include all partitioning +# - Can include all partitioning # GeometryBucket --> GeometryIntersection, save geometries in GeoParquet -#### CentroidBucket +#### CentroidBucket -#### GeometryBucket -# - Exploit kdtree and dask -# - Borrow/Replace from pyresample bucket +#### GeometryBucket +# - Exploit kdtree and dask +# - Borrow/Replace from pyresample bucket # - Output to xvec/datatree? # - Intermediate for remapping (weighted fraction, fraction_overlap, conservative) ####--------------------------------------------------------. #### Bucket Improvements -# Readers -# - gpm.bucket.read_around_point(bucket_dir, lon, lat, distance, size, **polars_kwargs) -# --> compute distance on subset and select below threshold +# Readers +# - gpm.bucket.read_around_point(bucket_dir, lon, lat, distance, size, **polars_kwargs) +# --> compute distance on subset and select below threshold # --> https://stackoverflow.com/questions/76262681/i-need-to-create-a-column-with-the-distance-between-two-coordinates-in-polars -# - gpm.bucket.read_within_extent(bucket_dir, extent, **polars_kwargs) -# - gpm.bucket.read_within_country(bucket_dir, country, **polars_kwargs) -# - gpm.bucket.read_within_continent(bucket_dir, continent, **polars_kwargs) +# - gpm.bucket.read_within_extent(bucket_dir, extent, **polars_kwargs) +# - gpm.bucket.read_within_country(bucket_dir, country, **polars_kwargs) +# - gpm.bucket.read_within_continent(bucket_dir, continent, **polars_kwargs) # Core: # - list_partitions_within_extent @@ -86,24 +89,24 @@ # - read_within_extent ! # Routines -# - Routine to repartition in smaller partitions (disaggregate bucket) -# - Routine to repartition in larger partitions (aggregate bucket) - -# Analysis -# - Group by overpass -# - Reformat to dataset / Generator +# - Routine to repartition in smaller partitions (disaggregate bucket) +# - Routine to repartition in larger partitions (aggregate bucket) + +# Analysis +# - Group by overpass +# - Reformat to dataset / Generator -# Writers -# - RENAME: write_partitioned_dataset into write_arrow_dataset +# Writers +# - RENAME: write_partitioned_dataset into write_arrow_dataset ####--------------------------------------------------------. -#### Query directories -# Format +#### Query directories +# Format # - hive: xbin=xx/ybin=yy or tile_id=id -# - xx/yy or id +# - xx/yy or id ## Option1 (faster if small query and lot of directories) -# - Create directories paths -# - Check if they exists +# - Create directories paths +# - Check if they exists ## Option2 (might be faster for large queries) ## - List directories ## - Filter by lon/lat @@ -116,102 +119,102 @@ # - https://h3geo.org/docs/comparisons/s2 # - H3: https://github.com/uber/h3-py # https://pypi.org/project/h3/ # --> h3.latlng_to_cell(lat, lng, resolution) - + ####--------------------------------------------------------------------------------------------- # ProjectionPartitioning -# Enable distance in meters and compute planar or on sphere +# Enable distance in meters and compute planar or on sphere # def get_partitions_around_point(self, x, y, distance=None, size=None): # extent = get_extent_around_point(x, y, distance=distance, size=size) # return self.get_partitions_by_extent(extent=extent) # Requires pyproj CRS. Backproject x,y to lon/lat # def get_partitions_by_country(self, name, padding=None): -# extent = get_country_extent(name, padding=padding) +# extent = get_country_extent(name, padding=padding) # return self.get_partitions_by_extent(extent=extent) # Requires pyproj CRS. Backproject x,y to lon/lat # def get_partitions_by_continent(self, name, padding=None): -# extent = get_continent_extent(name, padding=padding) +# extent = get_continent_extent(name, padding=padding) # return self.get_partitions_by_extent(extent=extent) ####--------------------------------------------------------------------------------------------- def check_valid_dataframe(func): - """ - Decorator to check if the first argument or the keyword argument 'df' - is a `pandas.DataFrame` or a `polars.DataFrame`. - """ - @wraps(func) - def wrapper(*args, **kwargs): - # Check if 'df' is in kwargs, otherwise assume it is the first positional argument - df = kwargs.get('df', args[1] if len(args) == 2 else None) - # Validate the DataFrame - if not isinstance(df, (pd.DataFrame, pl.DataFrame)): - raise TypeError("The 'df' argument must be either a pandas.DataFrame or a polars.DataFrame.") - return func(*args, **kwargs) - return wrapper + """ + Decorator to check if the first argument or the keyword argument 'df' + is a `pandas.DataFrame` or a `polars.DataFrame`. + """ + + @wraps(func) + def wrapper(*args, **kwargs): + # Check if 'df' is in kwargs, otherwise assume it is the first positional argument + df = kwargs.get("df", args[1] if len(args) == 2 else None) + # Validate the DataFrame + if not isinstance(df, (pd.DataFrame, pl.DataFrame)): + raise TypeError("The 'df' argument must be either a pandas.DataFrame or a polars.DataFrame.") + return func(*args, **kwargs) + + return wrapper def check_valid_x_y(df, x, y): """Check if the x and y columns are in the dataframe.""" - if y not in df: + if y not in df: raise ValueError(f"y='{y}' is not a column of the dataframe.") - if x not in df: + if x not in df: raise ValueError(f"x='{x}' is not a column of the dataframe.") def ensure_xy_without_nan_values(df, x, y, remove_invalid_rows=True): """Ensure valid coordinates in the dataframe.""" - # Remove NaN vales - if remove_invalid_rows: + # Remove NaN vales + if remove_invalid_rows: if isinstance(df, pd.DataFrame): - return df.dropna(subset=[x,y]) - else: + return df.dropna(subset=[x, y]) + else: return df.filter(~pl.col(x).is_null() | ~pl.col(y).is_null()) - + # Check no NaN values - if isinstance(df, pd.DataFrame): + if isinstance(df, pd.DataFrame): indices = df[[x, y]].isna().any(axis=1) - else: - indices = (df[x].is_null() | df[x].is_null()) - if indices.any(): - rows_indices = np.where(indices)[0].tolist() - raise ValueError(f"Null values present in columns {x} and {y} at rows: {rows_indices}") - return df + else: + indices = df[x].is_null() | df[x].is_null() + if indices.any(): + rows_indices = np.where(indices)[0].tolist() + raise ValueError(f"Null values present in columns {x} and {y} at rows: {rows_indices}") + return df def ensure_valid_partitions(df, xbin, ybin, remove_invalid_rows=True): """Ensure valid partitions labels in the dataframe.""" - # Remove NaN values - if remove_invalid_rows: + # Remove NaN values + if remove_invalid_rows: if isinstance(df, pd.DataFrame): - return df.dropna(subset=[xbin,ybin]) - else: + return df.dropna(subset=[xbin, ybin]) + else: df = df.filter(~pl.col(xbin).is_in(["outside_right", "outside_left"])) df = df.filter(~pl.col(ybin).is_in(["outside_right", "outside_left"])) df = df.filter(~pl.col(xbin).is_null() | ~pl.col(ybin).is_null()) return df - - # Check no invalid partitions (NaN or polars outside_right/outside_left) - if isinstance(df, pd.DataFrame): + + # Check no invalid partitions (NaN or polars outside_right/outside_left) + if isinstance(df, pd.DataFrame): indices = df[[xbin, ybin]].isna().any(axis=1) else: - indices = (df[xbin].is_in(["outside_right", "outside_left"]) | - df[ybin].is_in(["outside_right", "outside_left"]) - ) - if indices.any(): - rows_indices = np.where(indices)[0].tolist() - raise ValueError(f"Out of extent x,y coordinates at rows: {rows_indices}") - + indices = df[xbin].is_in(["outside_right", "outside_left"]) | df[ybin].is_in(["outside_right", "outside_left"]) + if indices.any(): + rows_indices = np.where(indices)[0].tolist() + raise ValueError(f"Out of extent x,y coordinates at rows: {rows_indices}") -def get_array_combinations(x,y): + +def get_array_combinations(x, y): """Return all the combinations between the two array.""" # Create the mesh grid grid1, grid2 = np.meshgrid(x, y) # Stack and reshape the grid arrays to get combinations combinations = np.vstack([grid1.ravel(), grid2.ravel()]).T - return combinations + return combinations def get_n_decimals(number): @@ -234,7 +237,7 @@ def get_breaks(size, vmin, vmax): return breaks -def get_midpoints(size, vmin, vmax): +def get_midpoints(size, vmin, vmax): """Define partitions midpoints.""" breaks = get_breaks(size, vmin=vmin, vmax=vmax) midpoints = breaks[0:-1] + size / 2 @@ -262,14 +265,14 @@ def get_breaks_and_labels(size, vmin, vmax): return breaks, labels -def query_labels(values, breaks, labels): +def query_labels(values, breaks, labels): """Return the partition labels for the specified coordinates.""" # TODO: flag to raise error for NaN, None? values = np.atleast_1d(np.asanyarray(values)).astype(float) return pd.cut(values, bins=breaks, labels=labels, include_lowest=True, right=True) - -def query_midpoints(values, breaks, midpoints): + +def query_midpoints(values, breaks, midpoints): """Return the partition midpoints for the specified coordinates.""" values = np.atleast_1d(np.asanyarray(values)).astype(float) return pd.cut(values, bins=breaks, labels=midpoints, include_lowest=True, right=True).astype(float) @@ -279,15 +282,15 @@ def add_pandas_xy_partitions(df, size, extent, x, y, xbin, ybin, remove_invalid_ """Add partitions labels to a pandas DataFrame based on x, y coordinates.""" # Check x,y names check_valid_x_y(df, x, y) - # Check/remove rows with NaN x,y columns + # Check/remove rows with NaN x,y columns df = ensure_xy_without_nan_values(df, x=x, y=y, remove_invalid_rows=remove_invalid_rows) - # Retrieve breaks and labels (N and N+1) + # Retrieve breaks and labels (N and N+1) cut_x_breaks, cut_x_labels = get_breaks_and_labels(size[0], vmin=extent[0], vmax=extent[1]) cut_y_breaks, cut_y_labels = get_breaks_and_labels(size[1], vmin=extent[2], vmax=extent[3]) # Add partitions labels columns df[xbin] = query_labels(df[x].to_numpy(), breaks=cut_x_breaks, labels=cut_x_labels) df[ybin] = query_labels(df[y].to_numpy(), breaks=cut_y_breaks, labels=cut_y_labels) - # Check/remove rows with invalid partitions (NaN) + # Check/remove rows with invalid partitions (NaN) df = ensure_valid_partitions(df, xbin=xbin, ybin=ybin, remove_invalid_rows=remove_invalid_rows) return df @@ -296,12 +299,12 @@ def add_polars_xy_partitions(df, x, y, size, extent, xbin, ybin, remove_invalid_ """Add partitions to a polars DataFrame based on x, y coordinates.""" # Check x,y names check_valid_x_y(df, x, y) - # Check/remove rows with null x,y columns + # Check/remove rows with null x,y columns df = ensure_xy_without_nan_values(df, x=x, y=y, remove_invalid_rows=remove_invalid_rows) # Retrieve breaks and labels (N and N+1) cut_x_breaks, cut_x_labels = get_breaks_and_labels(size[0], vmin=extent[0], vmax=extent[1]) cut_y_breaks, cut_y_labels = get_breaks_and_labels(size[1], vmin=extent[2], vmax=extent[3]) - # Add outside labels for polars cut function + # Add outside labels for polars cut function cut_x_labels = ["outside_left", *cut_x_labels, "outside_right"] cut_y_labels = ["outside_left", *cut_y_labels, "outside_right"] # Deal with left inclusion @@ -311,38 +314,37 @@ def add_polars_xy_partitions(df, x, y, size, extent, xbin, ybin, remove_invalid_ df = df.with_columns( pl.col(x).cut(cut_x_breaks, labels=cut_x_labels, left_closed=False).alias(xbin), pl.col(y).cut(cut_y_breaks, labels=cut_y_labels, left_closed=False).alias(ybin), - ) - # Check/remove rows with invalid partitions (out of extent or Null) + ) + # Check/remove rows with invalid partitions (out of extent or Null) df = ensure_valid_partitions(df, xbin=xbin, ybin=ybin, remove_invalid_rows=remove_invalid_rows) - return df - + return df -def add_polars_xy_tile_partitions(df, size, extent, x,y, tile_id): +def add_polars_xy_tile_partitions(df, size, extent, x, y, tile_id): check_valid_x_y(df, x, y) - raise NotImplementedError() + raise NotImplementedError -def add_pandas_xy_tile_partitions(df, size, extent, x,y, tile_id): +def add_pandas_xy_tile_partitions(df, size, extent, x, y, tile_id): check_valid_x_y(df, x, y) - raise NotImplementedError() - + raise NotImplementedError -def df_to_xarray(df, xbin, ybin, size, extent, new_x=None, new_y=None): + +def df_to_xarray(df, xbin, ybin, size, extent, new_x=None, new_y=None): """Convert dataframe to xarray Dataset based on specified partitions midpoints. - - The partitioning cells not present in the dataframe are set to NaN. + + The partitioning cells not present in the dataframe are set to NaN. """ if isinstance(df, pl.DataFrame): df = df.to_pandas() - if set(df.index.names) == set([xbin, ybin]): + if set(df.index.names) == {xbin, ybin}: df = df.reset_index() - - # Ensure index is float or string + + # Ensure index is float or string df[xbin] = df[xbin].astype(float) df[ybin] = df[ybin].astype(float) df = df.set_index([xbin, ybin]) - + # Create an empty DataFrame with the MultiIndex x_midpoints = get_midpoints(size[0], vmin=extent[0], vmax=extent[1]) y_midpoints = get_midpoints(size[1], vmin=extent[2], vmax=extent[3]) @@ -354,16 +356,16 @@ def df_to_xarray(df, xbin, ybin, size, extent, new_x=None, new_y=None): # Create final dataframe df_full = empty_df.join(df, how="left") - + # Reshape to xarray ds = df_full.to_xarray() ds[xbin] = ds[xbin].astype(float) - + # Rename dictionary rename_dict = {} - if new_x is not None: - rename_dict[xbin] = new_x - if new_y is not None: + if new_x is not None: + rename_dict[xbin] = new_x + if new_y is not None: rename_dict[ybin] = new_y ds = ds.rename(rename_dict) return ds @@ -372,15 +374,15 @@ def df_to_xarray(df, xbin, ybin, size, extent, new_x=None, new_y=None): class XYPartitioning: """ Handales partitioning of data into x and y bins. - - Parameters: + + Parameters ---------- xbin : float Identifier for the x bin. ybin : float Identifier for the y bin. size : int, float, tuple, list - The size value(s) of the bins. + The size value(s) of the bins. The function interprets the input as follows: - int or float: The same size is enforced in both x and y directions. - tuple or list: The bin size for the x and y directions. @@ -388,15 +390,16 @@ class XYPartitioning: The extent for the partitioning specified as [xmin, xmax, ymin, ymax]. """ + def __init__(self, xbin, ybin, size, extent): - # Define extent + # Define extent self.extent = check_extent(extent) # Define bin size self.size = _check_size(size) # Define bin names - self.xbin = xbin + self.xbin = xbin self.ybin = ybin - # Define breaks, midpoints and labels + # Define breaks, midpoints and labels self.x_breaks = get_breaks(size=self.size[0], vmin=self.extent.xmin, vmax=self.extent.xmax) self.y_breaks = get_breaks(size=self.size[1], vmin=self.extent.ymin, vmax=self.extent.ymax) self.x_midpoints = get_midpoints(size=self.size[0], vmin=self.extent.xmin, vmax=self.extent.xmax) @@ -406,67 +409,74 @@ def __init__(self, xbin, ybin, size, extent): # Define info self.shape = (len(self.x_labels), len(self.y_labels)) self.n_partitions = len(self.x_labels) * len(self.y_labels) - self.n_x = self.shape[0] - self.n_y = self.shape[1] - + self.n_x = self.shape[0] + self.n_y = self.shape[1] + @property def partitions(self): return [self.xbin, self.ybin] - + @check_valid_dataframe - def add_partitions(self, df, x, y, remove_invalid_rows=True): - if isinstance(df, pd.DataFrame): - return add_pandas_xy_partitions(df=df, x=x, y=y, - size=self.size, - extent=self.extent, - xbin=self.xbin, - ybin=self.ybin, - remove_invalid_rows=remove_invalid_rows) - return add_polars_xy_partitions(df=df, x=x, y=y, - size=self.size, - extent=self.extent, - xbin=self.xbin, - ybin=self.ybin, - remove_invalid_rows=remove_invalid_rows) - + def add_partitions(self, df, x, y, remove_invalid_rows=True): + if isinstance(df, pd.DataFrame): + return add_pandas_xy_partitions( + df=df, + x=x, + y=y, + size=self.size, + extent=self.extent, + xbin=self.xbin, + ybin=self.ybin, + remove_invalid_rows=remove_invalid_rows, + ) + return add_polars_xy_partitions( + df=df, + x=x, + y=y, + size=self.size, + extent=self.extent, + xbin=self.xbin, + ybin=self.ybin, + remove_invalid_rows=remove_invalid_rows, + ) + @check_valid_dataframe def to_xarray(self, df, new_x=None, new_y=None): - return df_to_xarray(df, - xbin=self.xbin, - ybin=self.ybin, - size=self.size, extent=self.extent, - new_x=new_x, - new_y=new_y) - - def to_dict(self): - dictionary = {"name": self.__class__.__name__, - "extent": list(self.extent), - "size": self.size, - "xbin": self.xbin, - "ybin": self.ybin} + return df_to_xarray( + df, xbin=self.xbin, ybin=self.ybin, size=self.size, extent=self.extent, new_x=new_x, new_y=new_y + ) + + def to_dict(self): + dictionary = { + "name": self.__class__.__name__, + "extent": list(self.extent), + "size": self.size, + "xbin": self.xbin, + "ybin": self.ybin, + } return dictionary - + def query_x_labels(self, x): """Return the x partition labels for the specified x coordinates.""" return query_labels(x, breaks=self.x_breaks, labels=self.x_labels).astype(str) - + def query_y_labels(self, y): """Return the y partition labels for the specified y coordinates.""" return query_labels(y, breaks=self.y_breaks, labels=self.y_labels).astype(str) - - def query_labels(self, x, y): + + def query_labels(self, x, y): """Return the partition labels for the specified x,y coordinates.""" return self.query_x_labels(x), self.query_y_labels(y) - + def query_x_midpoints(self, x): """Return the x partition midpoints for the specified x coordinates.""" return query_midpoints(x, breaks=self.x_breaks, midpoints=self.x_midpoints) - + def query_y_midpoints(self, y): """Return the y partition midpoints for the specified y coordinates.""" return query_midpoints(y, breaks=self.y_breaks, midpoints=self.y_midpoints) - - def query_midpoints(self, x, y): + + def query_midpoints(self, x, y): """Return the partition midpoints for the specified x,y coordinates.""" return self.query_x_midpoints(x), self.query_y_midpoints(y) @@ -474,8 +484,12 @@ def get_partitions_by_extent(self, extent): """Return the partitions labels containing data within the extent.""" extent = check_extent(extent) # Define valid query extent (to be aligned with partitioning extent) - query_extent = [max(extent.xmin, self.extent.xmin), min(extent.xmax, self.extent.xmax), - max(extent.ymin, self.extent.ymin), min(extent.ymax, self.extent.ymax)] + query_extent = [ + max(extent.xmin, self.extent.xmin), + min(extent.xmax, self.extent.xmax), + max(extent.ymin, self.extent.ymin), + min(extent.ymax, self.extent.ymax), + ] query_extent = Extent(*query_extent) # Retrieve midpoints xmin, xmax = self.query_x_midpoints([query_extent.xmin, query_extent.xmax]) @@ -494,33 +508,32 @@ def get_partitions_around_point(self, x, y, distance=None, size=None): @property def quadmesh(self): """Return the quadrilateral mesh. - - A quadrilateral mesh is a grid of M by N adjacent quadrilaterals that are defined via a (M+1, N+1) + + A quadrilateral mesh is a grid of M by N adjacent quadrilaterals that are defined via a (M+1, N+1) grid of vertices. - - The quadrilateral mesh is accepted by `matplotlib.pyplot.pcolormesh`, `matplotlib.collections.QuadMesh` + + The quadrilateral mesh is accepted by `matplotlib.pyplot.pcolormesh`, `matplotlib.collections.QuadMesh` `matplotlib.collections.PolyQuadMesh`. - + np.naddary Quadmesh array of shape (M+1, N+1, 2) """ x_corners, y_corners = np.meshgrid(self.x_breaks, self.y_breaks) return np.stack((x_corners, y_corners), axis=2) - - # to_yaml + + # to_yaml # to_shapely # to_spherically (geographic) # to_geopandas [lat_bin, lon_bin, geometry] - class GeographicPartitioning(XYPartitioning): """ Handles geographic partitioning of data based on longitude and latitude bin sizes within a defined extent. - + The last bin size (in lon and lat direction) might not be of size ``size` ! - - Parameters: + + Parameters ---------- size : float The uniform size for longitude and latitude binning. @@ -536,41 +549,36 @@ class GeographicPartitioning(XYPartitioning): ---------- XYPartitioning """ + def __init__(self, size, xbin="lon_bin", ybin="lat_bin", extent=[-180, 180, -90, 90]): - super().__init__(xbin=xbin, ybin=ybin, size=size, extent=extent) - + super().__init__(xbin=xbin, ybin=ybin, size=size, extent=extent) + def get_partitions_around_point(self, lon, lat, distance=None, size=None): - extent = get_geographic_extent_around_point(lon=lon, lat=lat, - distance=distance, - size=size, - distance_type="geographic") + extent = get_geographic_extent_around_point( + lon=lon, lat=lat, distance=distance, size=size, distance_type="geographic" + ) return self.get_partitions_by_extent(extent=extent) - + def get_partitions_by_country(self, name, padding=None): - extent = get_country_extent(name, padding=padding) + extent = get_country_extent(name, padding=padding) return self.get_partitions_by_extent(extent=extent) - + def get_partitions_by_continent(self, name, padding=None): - extent = get_continent_extent(name, padding=padding) + extent = get_continent_extent(name, padding=padding) return self.get_partitions_by_extent(extent=extent) - + @check_valid_dataframe def to_xarray(self, df, new_x="lon", new_y="lat"): - return df_to_xarray(df, - xbin=self.xbin, - ybin=self.ybin, - size=self.size, - extent=self.extent, - new_x=new_x, - new_y=new_y) - + return df_to_xarray( + df, xbin=self.xbin, ybin=self.ybin, size=self.size, extent=self.extent, new_x=new_x, new_y=new_y + ) class TilePartitioning: """ Handles partitioning of data into tiles within a specified extent. - Parameters: + Parameters ---------- size : float The size of the tiles. @@ -580,25 +588,32 @@ class TilePartitioning: Identifier for the tile bin. The default is ``'tile_id'``. """ + def __init__(self, size, extent, tile_id="tile_id"): self.size = _check_size(size) self.extent = check_extent(extent) self.tile_id = tile_id - + @property def bins(self): return [self.tile_id] - + @check_valid_dataframe - def add_partitions(self, df, x, y): - if isinstance(df, pd.DataFrame): - return add_pandas_xy_tile_partitions(df, x=x, y=x, - size=self.size, - extent=self.extent, - tile_id=self.tile_id, - ) - return add_polars_xy_tile_partitions(df, x=x, y=x, - size=self.size, - extent=self.extent, - tile_id=self.tile_id, - ) \ No newline at end of file + def add_partitions(self, df, x, y): + if isinstance(df, pd.DataFrame): + return add_pandas_xy_tile_partitions( + df, + x=x, + y=x, + size=self.size, + extent=self.extent, + tile_id=self.tile_id, + ) + return add_polars_xy_tile_partitions( + df, + x=x, + y=x, + size=self.size, + extent=self.extent, + tile_id=self.tile_id, + ) diff --git a/gpm/tests/test_bucket/test_partitioning.py b/gpm/tests/test_bucket/test_partitioning.py index 34c18afd..55a0d039 100644 --- a/gpm/tests/test_bucket/test_partitioning.py +++ b/gpm/tests/test_bucket/test_partitioning.py @@ -1,23 +1,23 @@ #!/usr/bin/env python3 -# -*- coding: utf-8 -*- """ Created on Thu May 23 12:19:17 2024 @author: ghiggi """ +import numpy as np +import pandas as pd +import polars as pl import pytest -import pandas as pd -import numpy as np import xarray as xr -import polars as pl -from gpm.bucket.partitioning import XYPartitioning + from gpm.bucket.partitioning import ( - get_n_decimals, + XYPartitioning, + get_array_combinations, get_breaks, - get_labels, - get_midpoints, get_breaks_and_labels, - get_array_combinations, + get_labels, + get_midpoints, + get_n_decimals, ) @@ -37,11 +37,11 @@ def test_get_breaks(): def test_get_labels(): """Verify correct label generation.""" labels = get_labels(0.5, 0, 2) - expected_labels = ['0.25', '0.75', '1.25', '1.75'] + expected_labels = ["0.25", "0.75", "1.25", "1.75"] assert labels.tolist() == expected_labels - + labels = get_labels(0.999, 0, 2) - expected_labels =['0.4995', '1.4985', '2.4975'] + expected_labels = ["0.4995", "1.4985", "2.4975"] assert labels.tolist() == expected_labels @@ -50,34 +50,29 @@ def test_get_midpoints(): midpoints = get_midpoints(0.5, 0, 2) expected_midpoints = [0.25, 0.75, 1.25, 1.75] np.testing.assert_allclose(midpoints, expected_midpoints) - + midpoints = get_midpoints(0.999, 0, 2) expected_midpoints = [0.4995, 1.4985, 2.4975] np.testing.assert_allclose(midpoints, expected_midpoints) - + def test_get_breaks_and_labels(): """Ensure both breaks and labels are returned and accurate.""" breaks, labels = get_breaks_and_labels(0.5, 0, 2) assert np.array_equal(breaks, np.array([0, 0.5, 1.0, 1.5, 2])) - assert labels.tolist() == ['0.25', '0.75', '1.25', '1.75'] + assert labels.tolist() == ["0.25", "0.75", "1.25", "1.75"] def test_get_array_combinations(): x = np.array([1, 2, 3]) y = np.array([4, 5]) - expected_result = np.array( - [[1, 4], - [2, 4], - [3, 4], - [1, 5], - [2, 5], - [3, 5]]) - np.testing.assert_allclose(get_array_combinations(x,y), expected_result) + expected_result = np.array([[1, 4], [2, 4], [3, 4], [1, 5], [2, 5], [3, 5]]) + np.testing.assert_allclose(get_array_combinations(x, y), expected_result) + class TestXYPartitioning: """Tests for the XYPartitioning class.""" - + def test_initialization(self): """Test proper initialization of XYPartitioning objects.""" partitioning = XYPartitioning(xbin="xbin", ybin="ybin", size=(1, 2), extent=[0, 10, 0, 10]) @@ -87,13 +82,13 @@ def test_initialization(self): assert partitioning.shape == (10, 5) assert partitioning.n_partitions == 50 assert partitioning.n_x == 10 - assert partitioning.n_y == 5 - np.testing.assert_allclose(partitioning.x_breaks, [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) - np.testing.assert_allclose(partitioning.y_breaks, [ 0, 2, 4, 6, 8, 10]) - np.testing.assert_allclose(partitioning.x_midpoints, [0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5]) - np.testing.assert_allclose(partitioning.y_midpoints, [1., 3., 5., 7., 9.]) - assert partitioning.x_labels.tolist() == ['0.5', '1.5', '2.5', '3.5', '4.5', '5.5', '6.5', '7.5', '8.5', '9.5'] - assert partitioning.y_labels.tolist() == ['1.0', '3.0', '5.0', '7.0', '9.0'] + assert partitioning.n_y == 5 + np.testing.assert_allclose(partitioning.x_breaks, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) + np.testing.assert_allclose(partitioning.y_breaks, [0, 2, 4, 6, 8, 10]) + np.testing.assert_allclose(partitioning.x_midpoints, [0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5]) + np.testing.assert_allclose(partitioning.y_midpoints, [1.0, 3.0, 5.0, 7.0, 9.0]) + assert partitioning.x_labels.tolist() == ["0.5", "1.5", "2.5", "3.5", "4.5", "5.5", "6.5", "7.5", "8.5", "9.5"] + assert partitioning.y_labels.tolist() == ["1.0", "3.0", "5.0", "7.0", "9.0"] def test_invalid_initialization(self): """Test initialization with invalid extent and size.""" @@ -102,232 +97,210 @@ def test_invalid_initialization(self): with pytest.raises(TypeError): XYPartitioning(xbin="xbin", ybin="ybin", size="invalid", extent=[0, 10, 0, 10]) - + def test_add_partitions_pandas(self): """Test valid partitions are added to a pandas dataframe.""" # Create test dataframe - df = pd.DataFrame({ - 'x': [-0.001, -0.0, 0, 0.5, 1.0, 1.5, 2.0, 2.1, np.nan], - 'y': [-0.001, -0.0, 0, 0.5, 1.0, 1.5, 2.0, 2.1, np.nan], - }) + df = pd.DataFrame( + { + "x": [-0.001, -0.0, 0, 0.5, 1.0, 1.5, 2.0, 2.1, np.nan], + "y": [-0.001, -0.0, 0, 0.5, 1.0, 1.5, 2.0, 2.1, np.nan], + } + ) # Create partitioning - size=(0.5, 0.25) - extent=[0, 2, 0, 2] - partitioning = XYPartitioning(xbin="my_xbin", ybin="my_ybin", - size=size, extent=extent) + size = (0.5, 0.25) + extent = [0, 2, 0, 2] + partitioning = XYPartitioning(xbin="my_xbin", ybin="my_ybin", size=size, extent=extent) # Add partitions df_out = partitioning.add_partitions(df, x="x", y="y", remove_invalid_rows=True) - # Test results + # Test results expected_xbin = [0.25, 0.25, 0.25, 0.75, 1.25, 1.75] expected_ybin = [0.125, 0.125, 0.375, 0.875, 1.375, 1.875] assert df_out["my_xbin"].dtype.name == "category", "X bin are not of categorical type." assert df_out["my_ybin"].dtype.name == "category", "Y bin are not of categorical type." - assert df_out['my_xbin'].astype(float).tolist() == expected_xbin, "X bin are incorrect." - assert df_out['my_ybin'].astype(float).tolist() == expected_ybin, "Y bin are incorrect." - + assert df_out["my_xbin"].astype(float).tolist() == expected_xbin, "X bin are incorrect." + assert df_out["my_ybin"].astype(float).tolist() == expected_ybin, "Y bin are incorrect." + def test_add_partitions_polars(self): """Test valid partitions are added to a polars dataframe.""" # Create test dataframe - df = pl.DataFrame(pd.DataFrame({ - 'x': [-0.001, -0.0, 0, 0.5, 1.0, 1.5, 2.0, 2.1, np.nan], - 'y': [-0.001, -0.0, 0, 0.5, 1.0, 1.5, 2.0, 2.1, np.nan], - })) + df = pl.DataFrame( + pd.DataFrame( + { + "x": [-0.001, -0.0, 0, 0.5, 1.0, 1.5, 2.0, 2.1, np.nan], + "y": [-0.001, -0.0, 0, 0.5, 1.0, 1.5, 2.0, 2.1, np.nan], + } + ) + ) # Create partitioning - size=(0.5, 0.25) - extent=[0, 2, 0, 2] - partitioning = XYPartitioning(xbin="my_xbin", ybin="my_ybin", - size=size, extent=extent) + size = (0.5, 0.25) + extent = [0, 2, 0, 2] + partitioning = XYPartitioning(xbin="my_xbin", ybin="my_ybin", size=size, extent=extent) # Add partitions df_out = partitioning.add_partitions(df, x="x", y="y", remove_invalid_rows=True) - - # Test results + + # Test results expected_xbin = [0.25, 0.25, 0.25, 0.75, 1.25, 1.75] expected_ybin = [0.125, 0.125, 0.375, 0.875, 1.375, 1.875] - assert df_out['my_xbin'].dtype == pl.datatypes.Categorical, "X bin are not of categorical type." - assert df_out['my_ybin'].dtype == pl.datatypes.Categorical, "X bin are not of categorical type." - assert df_out['my_xbin'].cast(float).to_list() == expected_xbin, "X bin are incorrect." - assert df_out['my_ybin'].cast(float).to_list() == expected_ybin, "Y bin are incorrect." + assert df_out["my_xbin"].dtype == pl.datatypes.Categorical, "X bin are not of categorical type." + assert df_out["my_ybin"].dtype == pl.datatypes.Categorical, "X bin are not of categorical type." + assert df_out["my_xbin"].cast(float).to_list() == expected_xbin, "X bin are incorrect." + assert df_out["my_ybin"].cast(float).to_list() == expected_ybin, "Y bin are incorrect." def test_to_xarray(self): """Test valid partitions are added to a pandas dataframe.""" # Create test dataframe - df = pd.DataFrame({ - 'x': [-0.001, -0.0, 0, 0.5, 1.0, 1.5, 2.0, 2.1, np.nan], - 'y': [-0.001, -0.0, 0, 0.5, 1.0, 1.5, 2.0, 2.1, np.nan], - }) + df = pd.DataFrame( + { + "x": [-0.001, -0.0, 0, 0.5, 1.0, 1.5, 2.0, 2.1, np.nan], + "y": [-0.001, -0.0, 0, 0.5, 1.0, 1.5, 2.0, 2.1, np.nan], + } + ) # Create partitioning - size=(0.5, 0.25) - extent=[0, 2, 0, 2] - partitioning = XYPartitioning(xbin="my_xbin", ybin="my_ybin", - size=size, extent=extent) + size = (0.5, 0.25) + extent = [0, 2, 0, 2] + partitioning = XYPartitioning(xbin="my_xbin", ybin="my_ybin", size=size, extent=extent) # Add partitions df = partitioning.add_partitions(df, x="x", y="y", remove_invalid_rows=True) - + # Aggregate by partitions df_grouped = df.groupby(partitioning.partitions, observed=True).median() df_grouped["dummy_var"] = 2 - + # Convert to Dataset ds = partitioning.to_xarray(df_grouped, new_x="lon", new_y="lat") - - # Test results + + # Test results expected_xbin = [0.25, 0.75, 1.25, 1.75] expected_ybin = [0.125, 0.375, 0.625, 0.875, 1.125, 1.375, 1.625, 1.875] assert isinstance(ds, xr.Dataset), "Not a xr.Dataset" - assert ds["lon"].data.dtype.name != 'object', "xr.Dataset coordinates should not be a string." - assert ds["lat"].data.dtype.name != 'object', "xr.Dataset coordinates should not be a string." - assert ds["lon"].data.dtype.name == 'float64', "xr.Dataset coordinates are not float64." - assert ds["lat"].data.dtype.name == 'float64', "xr.Dataset coordinates are not float64." + assert ds["lon"].data.dtype.name != "object", "xr.Dataset coordinates should not be a string." + assert ds["lat"].data.dtype.name != "object", "xr.Dataset coordinates should not be a string." + assert ds["lon"].data.dtype.name == "float64", "xr.Dataset coordinates are not float64." + assert ds["lat"].data.dtype.name == "float64", "xr.Dataset coordinates are not float64." assert "dummy_var" in ds, "The x columns has not become a xr.Dataset variable" - + def test_query_labels(self): - """Test valid labels queries.""" + """Test valid labels queries.""" # Create partitioning - size=(0.5, 0.25) - extent=[0, 2, 0, 2] - partitioning = XYPartitioning(xbin="my_xbin", ybin="my_ybin", - size=size, extent=extent) - # Test results - assert partitioning.query_x_labels(1).tolist() == ['0.75'] - assert partitioning.query_y_labels(1).tolist() == ['0.875'] - assert partitioning.query_x_labels(np.array(1)).tolist() == ['0.75'] - assert partitioning.query_x_labels(np.array([1])).tolist() == ['0.75'] - assert partitioning.query_x_labels(np.array([1, 1])).tolist() == ['0.75', '0.75'] - assert partitioning.query_x_labels([1, 1]).tolist() == ['0.75', '0.75'] - - x_labels, y_labels = partitioning.query_labels([1,2], [0,1]) - assert x_labels.tolist() == ['0.75', '1.75'] - assert y_labels.tolist() == ['0.125', '0.875'] - - # Test out of extent - assert partitioning.query_x_labels([-1, 1]).tolist() == ['nan', '0.75'] - - # Test with input nan - assert partitioning.query_x_labels(np.nan).tolist() == ['nan'] - assert partitioning.query_x_labels(None).tolist() == ['nan'] - - # Test with input string + size = (0.5, 0.25) + extent = [0, 2, 0, 2] + partitioning = XYPartitioning(xbin="my_xbin", ybin="my_ybin", size=size, extent=extent) + # Test results + assert partitioning.query_x_labels(1).tolist() == ["0.75"] + assert partitioning.query_y_labels(1).tolist() == ["0.875"] + assert partitioning.query_x_labels(np.array(1)).tolist() == ["0.75"] + assert partitioning.query_x_labels(np.array([1])).tolist() == ["0.75"] + assert partitioning.query_x_labels(np.array([1, 1])).tolist() == ["0.75", "0.75"] + assert partitioning.query_x_labels([1, 1]).tolist() == ["0.75", "0.75"] + + x_labels, y_labels = partitioning.query_labels([1, 2], [0, 1]) + assert x_labels.tolist() == ["0.75", "1.75"] + assert y_labels.tolist() == ["0.125", "0.875"] + + # Test out of extent + assert partitioning.query_x_labels([-1, 1]).tolist() == ["nan", "0.75"] + + # Test with input nan + assert partitioning.query_x_labels(np.nan).tolist() == ["nan"] + assert partitioning.query_x_labels(None).tolist() == ["nan"] + + # Test with input string with pytest.raises(ValueError): partitioning.query_x_labels("dummy") - + def test_query_midpoints(self): - """Test valid midpoint queries.""" - + """Test valid midpoint queries.""" # Create partitioning - size=(0.5, 0.25) - extent=[0, 2, 0, 2] - partitioning = XYPartitioning(xbin="my_xbin", ybin="my_ybin", - size=size, extent=extent) - # Test results + size = (0.5, 0.25) + extent = [0, 2, 0, 2] + partitioning = XYPartitioning(xbin="my_xbin", ybin="my_ybin", size=size, extent=extent) + # Test results np.testing.assert_allclose(partitioning.query_x_midpoints(1), [0.75]) np.testing.assert_allclose(partitioning.query_y_midpoints(1).tolist(), [0.875]) - np.testing.assert_allclose( partitioning.query_x_midpoints(np.array(1)), [0.75]) - np.testing.assert_allclose( partitioning.query_x_midpoints(np.array([1])), [0.75]) - np.testing.assert_allclose( partitioning.query_x_midpoints(np.array([1, 1])), [0.75, 0.75]) - np.testing.assert_allclose( partitioning.query_x_midpoints([1, 1]), [0.75, 0.75]) - - x_midpoints, y_midpoints = partitioning.query_midpoints([1,2], [0,1]) + np.testing.assert_allclose(partitioning.query_x_midpoints(np.array(1)), [0.75]) + np.testing.assert_allclose(partitioning.query_x_midpoints(np.array([1])), [0.75]) + np.testing.assert_allclose(partitioning.query_x_midpoints(np.array([1, 1])), [0.75, 0.75]) + np.testing.assert_allclose(partitioning.query_x_midpoints([1, 1]), [0.75, 0.75]) + + x_midpoints, y_midpoints = partitioning.query_midpoints([1, 2], [0, 1]) np.testing.assert_allclose(x_midpoints.tolist(), [0.75, 1.75]) np.testing.assert_allclose(y_midpoints.tolist(), [0.125, 0.875]) - # Test out of extent + # Test out of extent np.testing.assert_allclose(partitioning.query_x_midpoints([-1, 1]), [np.nan, 0.75]) - + # Test with input nan or None - np.testing.assert_allclose( partitioning.query_x_midpoints(np.nan).tolist(), [np.nan]) - np.testing.assert_allclose( partitioning.query_x_midpoints(None).tolist(), [np.nan]) - - # Test with input string + np.testing.assert_allclose(partitioning.query_x_midpoints(np.nan).tolist(), [np.nan]) + np.testing.assert_allclose(partitioning.query_x_midpoints(None).tolist(), [np.nan]) + + # Test with input string with pytest.raises(ValueError): partitioning.query_x_midpoints("dummy") def test_get_partitions_by_extent(self): - """Test get_partitions_by_extent.""" + """Test get_partitions_by_extent.""" # Create partitioning - size=(0.5, 0.25) - extent=[0, 2, 0, 2] - partitioning = XYPartitioning(xbin="my_xbin", ybin="my_ybin", - size=size, extent=extent) - # Test results with extent within + size = (0.5, 0.25) + extent = [0, 2, 0, 2] + partitioning = XYPartitioning(xbin="my_xbin", ybin="my_ybin", size=size, extent=extent) + # Test results with extent within new_extent = [0, 0.5, 0, 0.5] labels = partitioning.get_partitions_by_extent(new_extent) - expected_labels = np.array([['0.25', '0.125'], - ['0.25', '0.375']]) - assert expected_labels.tolist() == labels.tolist() - - # Test results with extent outside + expected_labels = np.array([["0.25", "0.125"], ["0.25", "0.375"]]) + assert expected_labels.tolist() == labels.tolist() + + # Test results with extent outside new_extent = [3, 4, 3, 4] labels = partitioning.get_partitions_by_extent(new_extent) assert labels.size == 0 - - # Test results with extent partially overlapping - new_extent = [1.5, 4, 1.75, 4] # BUG + + # Test results with extent partially overlapping + new_extent = [1.5, 4, 1.75, 4] # BUG labels = partitioning.get_partitions_by_extent(new_extent) - expected_labels = np.array([['1.25', '1.625'], - ['1.75', '1.625'], - ['1.25', '1.875'], - ['1.75', '1.875']]) - assert expected_labels.tolist() == labels.tolist() - + expected_labels = np.array([["1.25", "1.625"], ["1.75", "1.625"], ["1.25", "1.875"], ["1.75", "1.875"]]) + assert expected_labels.tolist() == labels.tolist() + def test_get_partitions_around_point(self): - """Test get_partitions_around_point.""" - # Create partitioning - size=(0.5, 0.25) - extent=[0, 2, 0, 2] - partitioning = XYPartitioning(xbin="my_xbin", ybin="my_ybin", - size=size, extent=extent) - # Test results with point within - labels = partitioning.get_partitions_around_point(x=1, y=1, distance=0) - assert labels.tolist() == [['0.75', '0.875']] - - # Test results with point outside - labels = partitioning.get_partitions_around_point(x=3, y=3, distance=0) - assert labels.size == 0 - - # Test results with point outside but area within - labels = partitioning.get_partitions_around_point(x=3, y=3, distance=1) - assert labels.tolist() == [['1.75', '1.875']] - + """Test get_partitions_around_point.""" + # Create partitioning + size = (0.5, 0.25) + extent = [0, 2, 0, 2] + partitioning = XYPartitioning(xbin="my_xbin", ybin="my_ybin", size=size, extent=extent) + # Test results with point within + labels = partitioning.get_partitions_around_point(x=1, y=1, distance=0) + assert labels.tolist() == [["0.75", "0.875"]] + + # Test results with point outside + labels = partitioning.get_partitions_around_point(x=3, y=3, distance=0) + assert labels.size == 0 + + # Test results with point outside but area within + labels = partitioning.get_partitions_around_point(x=3, y=3, distance=1) + assert labels.tolist() == [["1.75", "1.875"]] + def test_quadmesh(self): - """Test quadmesh.""" - size=(1, 1) - extent=[0, 2, 1, 3] - partitioning = XYPartitioning(xbin="my_xbin", ybin="my_ybin", - size=size, extent=extent) - # Test results - assert partitioning.quadmesh.shape == (3, 3, 2) - x_mesh = np.array([[0, 1, 2], - [0, 1, 2], - [0, 1, 2]]) - y_mesh = np.array([[1, 1, 1], - [2, 2, 2], - [3, 3, 3]]) - np.testing.assert_allclose(partitioning.quadmesh[:,:, 0], x_mesh) - np.testing.assert_allclose(partitioning.quadmesh[:,:, 1], y_mesh) - - # TODO: origin: y: bottom or upper (RIGHT NOW UPPER !) # BUG: increase by descinding - - + """Test quadmesh.""" + size = (1, 1) + extent = [0, 2, 1, 3] + partitioning = XYPartitioning(xbin="my_xbin", ybin="my_ybin", size=size, extent=extent) + # Test results + assert partitioning.quadmesh.shape == (3, 3, 2) + x_mesh = np.array([[0, 1, 2], [0, 1, 2], [0, 1, 2]]) + y_mesh = np.array([[1, 1, 1], [2, 2, 2], [3, 3, 3]]) + np.testing.assert_allclose(partitioning.quadmesh[:, :, 0], x_mesh) + np.testing.assert_allclose(partitioning.quadmesh[:, :, 1], y_mesh) + + # TODO: origin: y: bottom or upper (RIGHT NOW UPPER !) # BUG: increase by descinding + def test_to_dict(self): # Create partitioning - size=(0.5, 0.25) - extent=[0, 2, 0, 2] + size = (0.5, 0.25) + extent = [0, 2, 0, 2] xbin = "my_xbin" ybin = "my_ybin" - partitioning = XYPartitioning(xbin=xbin, ybin=ybin, - size=size, extent=extent) + partitioning = XYPartitioning(xbin=xbin, ybin=ybin, size=size, extent=extent) # Test results - expected_dict = {"name": "XYPartitioning", - "extent": extent, - "size": size, - "xbin": xbin, - "ybin": ybin} + expected_dict = {"name": "XYPartitioning", "extent": extent, "size": size, "xbin": xbin, "ybin": ybin} assert partitioning.to_dict() == expected_dict - - - - - - - \ No newline at end of file