Skip to content

Commit

Permalink
Merge pull request #117 from astronomy-commons/delucchi/012
Browse files Browse the repository at this point in the history
Refactor based on hipscat v0.1.2
  • Loading branch information
delucchi-cmu authored Aug 15, 2023
2 parents 7ae05c0 + 450d826 commit 8e3314c
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 21 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies = [
"dask[distributed]",
"deprecated",
"healpy",
"hipscat >= 0.1.1",
"hipscat >= 0.1.2",
"ipykernel", # Support for Jupyter notebooks
"pandas",
"pyarrow",
Expand Down
11 changes: 6 additions & 5 deletions src/hipscat_import/catalog/file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

import abc

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from astropy.table import Table
from hipscat.io import file_io
from hipscat.io import FilePointer, file_io

# pylint: disable=too-few-public-methods,too-many-arguments

Expand Down Expand Up @@ -141,7 +140,9 @@ def read(self, input_file):
self.regular_file_exists(input_file)

if self.schema_file:
schema_parquet = pd.read_parquet(self.schema_file, dtype_backend="numpy_nullable")
schema_parquet = file_io.load_parquet_to_pandas(
FilePointer(self.schema_file), dtype_backend="numpy_nullable"
)

use_column_names = None
if self.column_names:
Expand All @@ -155,8 +156,8 @@ def read(self, input_file):
elif self.schema_file:
use_type_map = schema_parquet.dtypes.to_dict()

with pd.read_csv(
input_file,
with file_io.load_csv_to_pandas(
FilePointer(input_file),
chunksize=self.chunksize,
sep=self.separator,
header=self.header,
Expand Down
3 changes: 1 addition & 2 deletions src/hipscat_import/margin_cache/margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import healpy as hp
import pandas as pd
import pyarrow.dataset as ds
from hipscat import pixel_math
from hipscat.io import file_io, paths
Expand All @@ -17,7 +16,7 @@ def map_pixel_shards(
dec_column,
):
"""Creates margin cache shards from a source partition file."""
data = pd.read_parquet(partition_file)
data = file_io.load_parquet_to_pandas(partition_file)

data["margin_pixel"] = hp.ang2pix(
2**margin_order,
Expand Down
10 changes: 6 additions & 4 deletions src/hipscat_import/soap/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ def _count_joins_for_object(source_data, object_catalog_dir, object_id_column, o
pixel_order=object_pixel.order,
pixel_number=object_pixel.pixel,
)
object_data = pd.read_parquet(path=object_path, columns=[object_id_column]).set_index(object_id_column)
object_data = file_io.load_parquet_to_pandas(object_path, columns=[object_id_column]).set_index(
object_id_column
)

joined_data = source_data.merge(object_data, how="inner", left_index=True, right_index=True)

Expand Down Expand Up @@ -64,9 +66,9 @@ def count_joins(
pixel_order=source_pixel.order,
pixel_number=source_pixel.pixel,
)
source_data = pd.read_parquet(path=source_path, columns=[soap_args.source_object_id_column]).set_index(
soap_args.source_object_id_column
)
source_data = file_io.load_parquet_to_pandas(
source_path, columns=[soap_args.source_object_id_column]
).set_index(soap_args.source_object_id_column)

remaining_sources = len(source_data)
results = []
Expand Down
19 changes: 10 additions & 9 deletions tests/hipscat_import/soap/test_soap_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,17 @@ def test_object_to_source_map(small_sky_object_catalog, small_sky_source_catalog

def test_mismatch_order_map(catalog_info_data, source_catalog_info):
"""Create some catalogs that will exercise edge case behavior of map-generation."""
catalog_pixels = pd.DataFrame(
data=[[1, 0, 16], [2, 0, 68], [2, 0, 69], [2, 0, 70], [2, 0, 71]],
columns=["Norder", "Dir", "Npix"],
)
object_catalog = Catalog(CatalogInfo(**catalog_info_data), catalog_pixels)
catalog_pixels = pd.DataFrame(
data=[[1, 0, 16]],
columns=["Norder", "Dir", "Npix"],
object_catalog = Catalog(
CatalogInfo(**catalog_info_data),
[
HealpixPixel(1, 16),
HealpixPixel(2, 68),
HealpixPixel(2, 69),
HealpixPixel(2, 70),
HealpixPixel(2, 71),
],
)
source_catalog = Catalog(CatalogInfo(**source_catalog_info), catalog_pixels)
source_catalog = Catalog(CatalogInfo(**source_catalog_info), [HealpixPixel(1, 16)])

expected = {
HealpixPixel(1, 16): [
Expand Down

0 comments on commit 8e3314c

Please sign in to comment.