From 450d826035f270ec7ab5f3948a12d17613db6681 Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Thu, 10 Aug 2023 20:07:07 -0400 Subject: [PATCH] Refactor based on hipscat v0.1.2 --- pyproject.toml | 2 +- src/hipscat_import/catalog/file_readers.py | 11 ++++++----- .../margin_cache/margin_cache_map_reduce.py | 3 +-- src/hipscat_import/soap/map_reduce.py | 10 ++++++---- .../soap/test_soap_resume_plan.py | 19 ++++++++++--------- 5 files changed, 24 insertions(+), 21 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index fbb97f8c..4d0703b0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,7 @@ dependencies = [ "dask[distributed]", "deprecated", "healpy", - "hipscat >= 0.1.1", + "hipscat >= 0.1.2", "ipykernel", # Support for Jupyter notebooks "pandas", "pyarrow", diff --git a/src/hipscat_import/catalog/file_readers.py b/src/hipscat_import/catalog/file_readers.py index 8040aea7..97359271 100644 --- a/src/hipscat_import/catalog/file_readers.py +++ b/src/hipscat_import/catalog/file_readers.py @@ -2,11 +2,10 @@ import abc -import pandas as pd import pyarrow as pa import pyarrow.parquet as pq from astropy.table import Table -from hipscat.io import file_io +from hipscat.io import FilePointer, file_io # pylint: disable=too-few-public-methods,too-many-arguments @@ -141,7 +140,9 @@ def read(self, input_file): self.regular_file_exists(input_file) if self.schema_file: - schema_parquet = pd.read_parquet(self.schema_file, dtype_backend="numpy_nullable") + schema_parquet = file_io.load_parquet_to_pandas( + FilePointer(self.schema_file), dtype_backend="numpy_nullable" + ) use_column_names = None if self.column_names: @@ -155,8 +156,8 @@ def read(self, input_file): elif self.schema_file: use_type_map = schema_parquet.dtypes.to_dict() - with pd.read_csv( - input_file, + with file_io.load_csv_to_pandas( + FilePointer(input_file), chunksize=self.chunksize, sep=self.separator, header=self.header, diff --git a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py index 3b286b8b..0b94e10a 100644 --- a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py +++ b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py @@ -1,5 +1,4 @@ import healpy as hp -import pandas as pd import pyarrow.dataset as ds from hipscat import pixel_math from hipscat.io import file_io, paths @@ -17,7 +16,7 @@ def map_pixel_shards( dec_column, ): """Creates margin cache shards from a source partition file.""" - data = pd.read_parquet(partition_file) + data = file_io.load_parquet_to_pandas(partition_file) data["margin_pixel"] = hp.ang2pix( 2**margin_order, diff --git a/src/hipscat_import/soap/map_reduce.py b/src/hipscat_import/soap/map_reduce.py index 14d5aec2..c63c667b 100644 --- a/src/hipscat_import/soap/map_reduce.py +++ b/src/hipscat_import/soap/map_reduce.py @@ -17,7 +17,9 @@ def _count_joins_for_object(source_data, object_catalog_dir, object_id_column, o pixel_order=object_pixel.order, pixel_number=object_pixel.pixel, ) - object_data = pd.read_parquet(path=object_path, columns=[object_id_column]).set_index(object_id_column) + object_data = file_io.load_parquet_to_pandas(object_path, columns=[object_id_column]).set_index( + object_id_column + ) joined_data = source_data.merge(object_data, how="inner", left_index=True, right_index=True) @@ -64,9 +66,9 @@ def count_joins( pixel_order=source_pixel.order, pixel_number=source_pixel.pixel, ) - source_data = pd.read_parquet(path=source_path, columns=[soap_args.source_object_id_column]).set_index( - soap_args.source_object_id_column - ) + source_data = file_io.load_parquet_to_pandas( + source_path, columns=[soap_args.source_object_id_column] + ).set_index(soap_args.source_object_id_column) remaining_sources = len(source_data) results = [] diff --git a/tests/hipscat_import/soap/test_soap_resume_plan.py b/tests/hipscat_import/soap/test_soap_resume_plan.py index c4d6282b..032d5ea9 100644 --- a/tests/hipscat_import/soap/test_soap_resume_plan.py +++ b/tests/hipscat_import/soap/test_soap_resume_plan.py @@ -50,16 +50,17 @@ def test_object_to_source_map(small_sky_object_catalog, small_sky_source_catalog def test_mismatch_order_map(catalog_info_data, source_catalog_info): """Create some catalogs that will exercise edge case behavior of map-generation.""" - catalog_pixels = pd.DataFrame( - data=[[1, 0, 16], [2, 0, 68], [2, 0, 69], [2, 0, 70], [2, 0, 71]], - columns=["Norder", "Dir", "Npix"], - ) - object_catalog = Catalog(CatalogInfo(**catalog_info_data), catalog_pixels) - catalog_pixels = pd.DataFrame( - data=[[1, 0, 16]], - columns=["Norder", "Dir", "Npix"], + object_catalog = Catalog( + CatalogInfo(**catalog_info_data), + [ + HealpixPixel(1, 16), + HealpixPixel(2, 68), + HealpixPixel(2, 69), + HealpixPixel(2, 70), + HealpixPixel(2, 71), + ], ) - source_catalog = Catalog(CatalogInfo(**source_catalog_info), catalog_pixels) + source_catalog = Catalog(CatalogInfo(**source_catalog_info), [HealpixPixel(1, 16)]) expected = { HealpixPixel(1, 16): [