Skip to content

Commit

Permalink
Merge pull request #175 from astronomy-commons/issue/149/generate-mar…
Browse files Browse the repository at this point in the history
…gin-from-dataframe

Generate margin catalog on `from_dataframe`
  • Loading branch information
camposandro authored Feb 27, 2024
2 parents efd2772 + a41ee2e commit 0ea299c
Show file tree
Hide file tree
Showing 24 changed files with 400 additions and 52 deletions.
55 changes: 7 additions & 48 deletions src/lsdb/loaders/dataframe/dataframe_catalog_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@
import hipscat as hc
import numpy as np
import pandas as pd
from dask import delayed
from hipscat.catalog import CatalogType
from hipscat.catalog.catalog_info import CatalogInfo
from hipscat.pixel_math import HealpixPixel, generate_histogram
from hipscat.pixel_math.healpix_pixel_function import get_pixel_argsort
from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, compute_hipscat_id, healpix_to_hipscat_id

from lsdb.catalog.catalog import Catalog
from lsdb.dask.divisions import get_pixels_divisions
from lsdb.loaders.dataframe.from_dataframe_utils import (
_append_partition_information_to_dataframe,
_generate_dask_dataframe,
)
from lsdb.types import DaskDFPixelMap, HealpixInfo

pd.options.mode.chained_assignment = None # default='warn'
Expand Down Expand Up @@ -160,9 +162,6 @@ def _generate_dask_df_and_map(
# Mapping HEALPix pixels to the respective Dataframe indices
ddf_pixel_map: Dict[HealpixPixel, int] = {}

# Dask Dataframe divisions
divisions = get_pixels_divisions(list(pixel_map.keys()))

for hp_pixel_index, hp_pixel_info in enumerate(pixel_map.items()):
hp_pixel, (_, pixels) = hp_pixel_info
# Store HEALPix pixel in map
Expand All @@ -171,28 +170,10 @@ def _generate_dask_df_and_map(
pixel_dfs.append(self._get_dataframe_for_healpix(hp_pixel, pixels))

# Generate Dask Dataframe with original schema
schema = pixel_dfs[0].iloc[:0, :].copy()
ddf, total_rows = self._generate_dask_dataframe(pixel_dfs, schema, divisions)
pixel_list = list(ddf_pixel_map.keys())
ddf, total_rows = _generate_dask_dataframe(pixel_dfs, pixel_list)
return ddf, ddf_pixel_map, total_rows

@staticmethod
def _generate_dask_dataframe(
pixel_dfs: List[pd.DataFrame], schema: pd.DataFrame, divisions: Tuple[int, ...] | None
) -> Tuple[dd.DataFrame, int]:
"""Create the Dask Dataframe from the list of HEALPix pixel Dataframes
Args:
pixel_dfs (List[pd.DataFrame]): The list of HEALPix pixel Dataframes
schema (pd.Dataframe): The original Dataframe schema
divisions (Tuple[int, ...]): The partitions divisions
Returns:
The catalog's Dask Dataframe and its total number of rows.
"""
delayed_dfs = [delayed(df) for df in pixel_dfs]
ddf = dd.from_delayed(delayed_dfs, meta=schema, divisions=divisions)
return ddf if isinstance(ddf, dd.DataFrame) else ddf.to_frame(), len(ddf)

def _get_dataframe_for_healpix(self, hp_pixel: HealpixPixel, pixels: List[int]) -> pd.DataFrame:
"""Computes the Pandas Dataframe containing the data points
for a certain HEALPix pixel.
Expand All @@ -215,26 +196,4 @@ def _get_dataframe_for_healpix(self, hp_pixel: HealpixPixel, pixels: List[int])
pixel_df = self.dataframe.loc[
(self.dataframe.index >= left_bound) & (self.dataframe.index < right_bound)
]
return self._append_partition_information_to_dataframe(pixel_df, hp_pixel)

def _append_partition_information_to_dataframe(
self, dataframe: pd.DataFrame, pixel: HealpixPixel
) -> pd.DataFrame:
"""Appends partitioning information to a HEALPix dataframe
Args:
dataframe (pd.Dataframe): A HEALPix's pandas dataframe
pixel (HealpixPixel): The HEALPix pixel for the current partition
Returns:
The dataframe for a HEALPix, with data points and respective partition information.
"""
ordered_columns = ["Norder", "Dir", "Npix"]
# Generate partition information
dataframe["Norder"] = pixel.order
dataframe["Npix"] = pixel.pixel
dataframe["Dir"] = pixel.dir
# Force new column types to int
dataframe[ordered_columns] = dataframe[ordered_columns].astype(int)
# Reorder the columns to match full path
return dataframe[[col for col in dataframe.columns if col not in ordered_columns] + ordered_columns]
return _append_partition_information_to_dataframe(pixel_df, hp_pixel)
23 changes: 19 additions & 4 deletions src/lsdb/loaders/dataframe/from_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from lsdb.catalog import Catalog
from lsdb.loaders.dataframe.dataframe_catalog_loader import DataframeCatalogLoader
from lsdb.loaders.dataframe.margin_catalog_generator import MarginCatalogGenerator


def from_dataframe(
Expand All @@ -12,6 +13,8 @@ def from_dataframe(
highest_order: int = 5,
partition_size: int | None = None,
threshold: int | None = None,
margin_order: int | None = -1,
margin_threshold: float = 5.0,
**kwargs,
) -> Catalog:
"""Load a catalog from a Pandas Dataframe in CSV format.
Expand All @@ -22,12 +25,24 @@ def from_dataframe(
highest_order (int): The highest partition order
partition_size (int): The desired partition size, in number of rows
threshold (int): The maximum number of data points per pixel
margin_order (int): The order at which to generate the margin cache
margin_threshold (float): The size of the margin cache boundary, in arcseconds
**kwargs: Arguments to pass to the creation of the catalog info
Returns:
Catalog object loaded from the given parameters
"""
loader = DataframeCatalogLoader(
dataframe, lowest_order, highest_order, partition_size, threshold, **kwargs
)
return loader.load_catalog()
catalog = DataframeCatalogLoader(
dataframe,
lowest_order,
highest_order,
partition_size,
threshold,
**kwargs,
).load_catalog()
catalog.margin = MarginCatalogGenerator(
catalog,
margin_order,
margin_threshold,
).create_catalog()
return catalog
109 changes: 109 additions & 0 deletions src/lsdb/loaders/dataframe/from_dataframe_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from typing import List, Tuple

import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask import delayed
from hipscat.catalog import PartitionInfo
from hipscat.pixel_math import HealpixPixel
from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN

from lsdb.dask.divisions import get_pixels_divisions


def _generate_dask_dataframe(
pixel_dfs: List[pd.DataFrame], pixels: List[HealpixPixel]
) -> Tuple[dd.DataFrame, int]:
"""Create the Dask Dataframe from the list of HEALPix pixel Dataframes
Args:
pixel_dfs (List[pd.DataFrame]): The list of HEALPix pixel Dataframes
pixels (List[HealpixPixel]): The list of HEALPix pixels in the catalog
Returns:
The catalog's Dask Dataframe and its total number of rows.
"""
schema = pixel_dfs[0].iloc[:0, :].copy()
divisions = get_pixels_divisions(pixels)
delayed_dfs = [delayed(df) for df in pixel_dfs]
ddf = dd.from_delayed(delayed_dfs, meta=schema, divisions=divisions)
return ddf if isinstance(ddf, dd.DataFrame) else ddf.to_frame(), len(ddf)


def _append_partition_information_to_dataframe(dataframe: pd.DataFrame, pixel: HealpixPixel) -> pd.DataFrame:
"""Append partitioning information to a HEALPix dataframe
Args:
dataframe (pd.Dataframe): A HEALPix's pandas dataframe
pixel (HealpixPixel): The HEALPix pixel for the current partition
Returns:
The dataframe for a HEALPix, with data points and respective partition information.
"""
columns_to_assign = {
PartitionInfo.METADATA_ORDER_COLUMN_NAME: pixel.order,
PartitionInfo.METADATA_DIR_COLUMN_NAME: pixel.dir,
PartitionInfo.METADATA_PIXEL_COLUMN_NAME: pixel.pixel,
}
column_types = {
PartitionInfo.METADATA_ORDER_COLUMN_NAME: np.uint8,
PartitionInfo.METADATA_DIR_COLUMN_NAME: np.uint64,
PartitionInfo.METADATA_PIXEL_COLUMN_NAME: np.uint64,
}
dataframe = dataframe.assign(**columns_to_assign).astype(column_types)
return _order_partition_dataframe_columns(dataframe)


def _format_margin_partition_dataframe(dataframe: pd.DataFrame) -> pd.DataFrame:
"""Finalizes the dataframe for a margin catalog partition
Args:
dataframe (pd.DataFrame): The partition dataframe
Returns:
The dataframe for a margin partition with the data points and
the respective pixel information.
"""
dataframe = dataframe.drop(columns=["margin_pixel"])
rename_columns = {
PartitionInfo.METADATA_ORDER_COLUMN_NAME: f"margin_{PartitionInfo.METADATA_ORDER_COLUMN_NAME}",
PartitionInfo.METADATA_DIR_COLUMN_NAME: f"margin_{PartitionInfo.METADATA_DIR_COLUMN_NAME}",
PartitionInfo.METADATA_PIXEL_COLUMN_NAME: f"margin_{PartitionInfo.METADATA_PIXEL_COLUMN_NAME}",
"partition_order": PartitionInfo.METADATA_ORDER_COLUMN_NAME,
"partition_pixel": PartitionInfo.METADATA_PIXEL_COLUMN_NAME,
}
dataframe.rename(columns=rename_columns, inplace=True)
dir_column = np.floor_divide(dataframe[PartitionInfo.METADATA_PIXEL_COLUMN_NAME].values, 10000) * 10000
dataframe[PartitionInfo.METADATA_DIR_COLUMN_NAME] = dir_column
dataframe = dataframe.astype(
{
PartitionInfo.METADATA_ORDER_COLUMN_NAME: np.uint8,
PartitionInfo.METADATA_DIR_COLUMN_NAME: np.uint64,
PartitionInfo.METADATA_PIXEL_COLUMN_NAME: np.uint64,
}
)
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN).sort_index()
return _order_partition_dataframe_columns(dataframe)


def _order_partition_dataframe_columns(dataframe: pd.DataFrame) -> pd.DataFrame:
"""Reorder columns of a partition dataframe so that pixel information
always appears in the same sequence
Args:
dataframe (pd.DataFrame): The partition dataframe
Returns:
The partition dataframe with the columns in the correct order.
"""
order_of_columns = [
f"margin_{PartitionInfo.METADATA_ORDER_COLUMN_NAME}",
f"margin_{PartitionInfo.METADATA_DIR_COLUMN_NAME}",
f"margin_{PartitionInfo.METADATA_PIXEL_COLUMN_NAME}",
PartitionInfo.METADATA_ORDER_COLUMN_NAME,
PartitionInfo.METADATA_DIR_COLUMN_NAME,
PartitionInfo.METADATA_PIXEL_COLUMN_NAME,
]
unordered_columns = [col for col in dataframe.columns if col not in order_of_columns]
ordered_columns = [col for col in order_of_columns if col in dataframe.columns]
return dataframe[unordered_columns + ordered_columns]
Loading

0 comments on commit 0ea299c

Please sign in to comment.