Skip to content

Commit

Permalink
Optionally write out all of the parquet files. (#181)
Browse files Browse the repository at this point in the history
* Optionally write out all of the parquet files.

* Scoot python versions forward (#182) (#183)

* Undo re-adding task key.
  • Loading branch information
delucchi-cmu authored Dec 6, 2023
1 parent 41695d9 commit b800040
Show file tree
Hide file tree
Showing 36 changed files with 341 additions and 39 deletions.
6 changes: 6 additions & 0 deletions src/hipscat_import/soap/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ class SoapArguments(RuntimeArguments):
## Input - Source catalog
source_catalog_dir: str = ""
source_object_id_column: str = ""
source_id_column: str = ""

resume: bool = True
"""if there are existing intermediate resume files, should we
read those and continue to run the pipeline where we left off"""
write_leaf_files: bool = False
"""Should we also write out leaf parquet files (e.g. Norder/Dir/Npix.parquet)
that represent the full association table"""

compute_partition_size: int = 1_000_000_000

Expand Down Expand Up @@ -66,5 +70,7 @@ def additional_runtime_provenance_info(self) -> dict:
"object_id_column": self.object_id_column,
"source_catalog_dir": self.source_catalog_dir,
"source_object_id_column": self.source_object_id_column,
"source_id_column": self.source_id_column,
"compute_partition_size": self.compute_partition_size,
"write_leaf_files": self.write_leaf_files,
}
130 changes: 110 additions & 20 deletions src/hipscat_import/soap/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,58 @@

import numpy as np
import pandas as pd
from hipscat.io import file_io
from hipscat.io.paths import pixel_catalog_file
import pyarrow.parquet as pq
from hipscat.catalog.association_catalog.partition_join_info import PartitionJoinInfo
from hipscat.io import FilePointer, file_io, paths
from hipscat.io.parquet_metadata import get_healpix_pixel_from_metadata
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from hipscat.pixel_math.healpix_pixel_function import get_pixel_argsort

from hipscat_import.soap.arguments import SoapArguments
from hipscat_import.soap.resume_plan import SoapPlan


def _count_joins_for_object(source_data, object_catalog_dir, object_id_column, object_pixel):
object_path = pixel_catalog_file(
catalog_base_dir=object_catalog_dir,
def _get_pixel_directory(cache_path, pixel: HealpixPixel):
"""Create a path for intermediate pixel data."""
return file_io.append_paths_to_pointer(
cache_path, f"order_{pixel.order}", f"dir_{pixel.dir}", f"pixel_{pixel.pixel}"
)


def _count_joins_for_object(source_data, source_pixel, object_pixel, soap_args):
object_path = paths.pixel_catalog_file(
catalog_base_dir=soap_args.object_catalog_dir,
pixel_order=object_pixel.order,
pixel_number=object_pixel.pixel,
)
object_data = file_io.load_parquet_to_pandas(object_path, columns=[object_id_column]).set_index(
object_id_column
object_data = file_io.load_parquet_to_pandas(object_path, columns=[soap_args.object_id_column]).set_index(
soap_args.object_id_column
)

joined_data = source_data.merge(object_data, how="inner", left_index=True, right_index=True)

return len(joined_data)
rows_written = len(joined_data)
if not soap_args.write_leaf_files or rows_written == 0:
return rows_written

pixel_dir = _get_pixel_directory(soap_args.tmp_path, object_pixel)
file_io.make_directory(pixel_dir, exist_ok=True)
output_file = file_io.append_paths_to_pointer(
pixel_dir, f"source_{source_pixel.order}_{source_pixel.pixel}.parquet"
)
joined_data = joined_data.reset_index()

joined_data["Norder"] = np.full(rows_written, fill_value=object_pixel.order, dtype=np.uint8)
joined_data["Dir"] = np.full(rows_written, fill_value=object_pixel.dir, dtype=np.uint32)
joined_data["Npix"] = np.full(rows_written, fill_value=object_pixel.pixel, dtype=np.uint32)

joined_data["join_Norder"] = np.full(rows_written, fill_value=source_pixel.order, dtype=np.uint8)
joined_data["join_Dir"] = np.full(rows_written, fill_value=source_pixel.dir, dtype=np.uint32)
joined_data["join_Npix"] = np.full(rows_written, fill_value=source_pixel.pixel, dtype=np.uint32)

joined_data.to_parquet(output_file, index=True)

return rows_written


def _write_count_results(cache_path, source_healpix, results):
Expand All @@ -48,9 +80,7 @@ def _write_count_results(cache_path, source_healpix, results):
)


def count_joins(
soap_args: SoapArguments, source_pixel: HealpixPixel, object_pixels: List[HealpixPixel], cache_path: str
):
def count_joins(soap_args: SoapArguments, source_pixel: HealpixPixel, object_pixels: List[HealpixPixel]):
"""Count the number of equijoined sources in the object pixels.
If any un-joined source pixels remain, stretch out to neighboring object pixels.
Expand All @@ -59,16 +89,19 @@ def count_joins(
source_pixel(HealpixPixel): order and pixel for the source catalog single pixel.
object_pixels(List[HealpixPixel]): set of tuples of order and pixel for the partitions
of the object catalog to be joined.
cache_path(str): path to write intermediate results CSV to.
"""
source_path = pixel_catalog_file(
source_path = paths.pixel_catalog_file(
catalog_base_dir=file_io.get_file_pointer_from_path(soap_args.source_catalog_dir),
pixel_order=source_pixel.order,
pixel_number=source_pixel.pixel,
)
source_data = file_io.load_parquet_to_pandas(
source_path, columns=[soap_args.source_object_id_column]
).set_index(soap_args.source_object_id_column)
if soap_args.write_leaf_files:
read_columns = [soap_args.source_object_id_column, soap_args.source_id_column]
else:
read_columns = [soap_args.source_object_id_column]
source_data = file_io.load_parquet_to_pandas(source_path, columns=read_columns).set_index(
soap_args.source_object_id_column
)

remaining_sources = len(source_data)
results = []
Expand All @@ -78,9 +111,9 @@ def count_joins(
break
join_count = _count_joins_for_object(
source_data,
soap_args.object_catalog_dir,
soap_args.object_id_column,
source_pixel,
object_pixel,
soap_args,
)
results.append([object_pixel.order, object_pixel.pixel, join_count])
remaining_sources -= join_count
Expand All @@ -89,17 +122,20 @@ def count_joins(
if remaining_sources > 0:
results.append([-1, -1, remaining_sources])

_write_count_results(cache_path, source_pixel, results)
_write_count_results(soap_args.tmp_path, source_pixel, results)


def combine_partial_results(input_path, output_path):
def combine_partial_results(input_path, output_path) -> int:
"""Combine many partial CSVs into single partition join info.
Also write out a debug file with counts of unmatched sources, if any.
Args:
input_path(str): intermediate directory with partial result CSVs. likely, the
directory used in the previous `count_joins` call as `cache_path`
output_path(str): directory to write the combined results CSVs.
Returns:
integer that is the sum of all matched num_rows.
"""
partial_files = file_io.find_files_matching_path(input_path, "**.csv")
partials = []
Expand Down Expand Up @@ -132,3 +168,57 @@ def combine_partial_results(input_path, output_path):
file_pointer=file_io.append_paths_to_pointer(output_path, "partition_info.csv"),
index=False,
)

join_info = PartitionJoinInfo(matched)
join_info.write_to_metadata_files(output_path)

return primary_only["num_rows"].sum()


def reduce_joins(
soap_args: SoapArguments, object_pixel: HealpixPixel, object_key: str, delete_input_files: bool = True
):
"""Reduce join tables into one parquet file per object-pixel, with one row-group
inside per source pixel."""
pixel_dir = _get_pixel_directory(soap_args.tmp_path, object_pixel)
# If there's no directory, this implies there were no matches to this object pixel
# earlier in the pipeline. Move on.
if not file_io.does_file_or_directory_exist(pixel_dir):
return
# Find all of the constituent files / source pixels. Create a list of PyArrow Tables from those
# parquet files. We need to know the schema before we create the ParquetWriter.
shard_file_list = file_io.find_files_matching_path(pixel_dir, "source**.parquet")

if len(shard_file_list) == 0:
return

## We want to order the row groups in a "breadth-first" sorting. Determine our sorting
## via the metadata, then read the tables in using that sorting.
healpix_pixels = []
for shard_file_name in shard_file_list:
healpix_pixels.append(
get_healpix_pixel_from_metadata(pq.read_metadata(shard_file_name), "join_Norder", "join_Npix")
)

argsort = get_pixel_argsort(healpix_pixels)
shard_file_list = np.array(shard_file_list)[argsort]

shards = []
for shard_file_name in shard_file_list:
shards.append(pq.read_table(shard_file_name))

# Write all of the shards into a single parquet file, one row-group-per-shard.
starting_catalog_path = FilePointer(str(soap_args.catalog_path))
destination_dir = paths.pixel_directory(starting_catalog_path, object_pixel.order, object_pixel.pixel)
file_io.make_directory(destination_dir, exist_ok=True)

output_file = paths.pixel_catalog_file(starting_catalog_path, object_pixel.order, object_pixel.pixel)
with pq.ParquetWriter(output_file, shards[0].schema) as writer:
for table in shards:
writer.write_table(table)

# Delete the intermediate shards.
if delete_input_files:
file_io.remove_directory(pixel_dir, ignore_errors=True)

SoapPlan.reducing_key_done(tmp_path=soap_args.tmp_path, reducing_key=object_key)
47 changes: 45 additions & 2 deletions src/hipscat_import/soap/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ class SoapPlan(PipelineResumePlan):

count_keys: List[Tuple[HealpixPixel, List[HealpixPixel], str]] = field(default_factory=list)
"""set of pixels (and job keys) that have yet to be counted"""
reduce_keys: List[Tuple[HealpixPixel, str]] = field(default_factory=list)
"""set of object catalog pixels (and job keys) that have yet to be reduced/combined"""
source_pixel_map: Optional[List[Tuple[HealpixPixel, List[HealpixPixel], str]]] = None
"""Map of object pixels to source pixels, with counting key."""
object_catalog: Catalog | None = None

COUNTING_STAGE = "counting"
REDUCING_STAGE = "reducing"
SOURCE_MAP_FILE = "source_object_map.npz"

def __init__(self, args: SoapArguments):
Expand All @@ -50,15 +54,20 @@ def gather_plan(self, args):
return
step_progress.update(1)

self.object_catalog = Catalog.read_from_hipscat(args.object_catalog_dir)
source_map_file = file_io.append_paths_to_pointer(self.tmp_path, self.SOURCE_MAP_FILE)
if file_io.does_file_or_directory_exist(source_map_file):
source_pixel_map = np.load(source_map_file, allow_pickle=True)["arr_0"].item()
else:
object_catalog = Catalog.read_from_hipscat(args.object_catalog_dir)
source_catalog = Catalog.read_from_hipscat(args.source_catalog_dir)
source_pixel_map = source_to_object_map(object_catalog, source_catalog)
source_pixel_map = source_to_object_map(self.object_catalog, source_catalog)
np.savez_compressed(source_map_file, source_pixel_map)
self.count_keys = self.get_sources_to_count(source_pixel_map=source_pixel_map)
self.reduce_keys = self.get_objects_to_reduce()
file_io.make_directory(
file_io.append_paths_to_pointer(self.tmp_path, self.REDUCING_STAGE),
exist_ok=True,
)
step_progress.update(1)

def wait_for_counting(self, futures):
Expand Down Expand Up @@ -98,6 +107,40 @@ def get_sources_to_count(self, source_pixel_map=None):
if f"{hp_pixel.order}_{hp_pixel.pixel}" not in counted_keys
]

@classmethod
def reducing_key_done(cls, tmp_path, reducing_key: str):
"""Mark a single reducing task as done
Args:
tmp_path (str): where to write intermediate resume files.
reducing_key (str): unique string for each reducing task (e.g. "3_57")
"""
cls.touch_key_done_file(tmp_path, cls.REDUCING_STAGE, reducing_key)

def get_objects_to_reduce(self):
"""Fetch a tuple for each object catalog pixel to reduce."""
reduced_keys = set(self.read_done_keys(self.REDUCING_STAGE))
reduce_items = [
(hp_pixel, f"{hp_pixel.order}_{hp_pixel.pixel}")
for hp_pixel in self.object_catalog.get_healpix_pixels()
if f"{hp_pixel.order}_{hp_pixel.pixel}" not in reduced_keys
]
return reduce_items

def is_reducing_done(self) -> bool:
"""Are there partitions left to reduce?"""
return self.done_file_exists(self.REDUCING_STAGE)

def wait_for_reducing(self, futures):
"""Wait for reducing stage futures to complete."""
self.wait_for_futures(futures, self.REDUCING_STAGE)
remaining_sources_to_reduce = self.get_objects_to_reduce()
if len(remaining_sources_to_reduce) > 0:
raise RuntimeError(
f"{len(remaining_sources_to_reduce)} reducing stages did not complete successfully."
)
self.touch_stage_done_file(self.REDUCING_STAGE)


def source_to_object_map(object_catalog, source_catalog):
"""Build a map of (source order/pixel) to the (object order/pixel)
Expand Down
32 changes: 26 additions & 6 deletions src/hipscat_import/soap/run_soap.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
The actual logic of the map reduce is in the `map_reduce.py` file.
"""

from hipscat.io import file_io, write_metadata
from hipscat.io import file_io, parquet_metadata, paths, write_metadata
from tqdm import tqdm

from hipscat_import.pipeline_resume_plan import PipelineResumePlan
from hipscat_import.soap.arguments import SoapArguments
from hipscat_import.soap.map_reduce import combine_partial_results, count_joins
from hipscat_import.soap.map_reduce import combine_partial_results, count_joins, reduce_joins
from hipscat_import.soap.resume_plan import SoapPlan


Expand All @@ -22,28 +22,48 @@ def run(args, client):
resume_plan = SoapPlan(args)
if not resume_plan.is_counting_done():
futures = []
for source_pixel, object_pixels, _source_key in resume_plan.count_keys:
for source_pixel, object_pixels, _ in resume_plan.count_keys:
futures.append(
client.submit(
count_joins,
soap_args=args,
source_pixel=source_pixel,
object_pixels=object_pixels,
cache_path=args.tmp_path,
)
)

resume_plan.wait_for_counting(futures)

if args.write_leaf_files and not resume_plan.is_reducing_done():
for object_pixel, object_key in resume_plan.reduce_keys:
futures.append(
client.submit(
reduce_joins,
soap_args=args,
object_pixel=object_pixel,
object_key=object_key,
)
)

resume_plan.wait_for_reducing(futures)

# All done - write out the metadata
with tqdm(
total=4, desc=PipelineResumePlan.get_formatted_stage_name("Finishing"), disable=not args.progress_bar
) as step_progress:
if args.write_leaf_files:
parquet_metadata.write_parquet_metadata(args.catalog_path)
total_rows = 0
metadata_path = paths.get_parquet_metadata_pointer(args.catalog_path)
for row_group in parquet_metadata.read_row_group_fragments(metadata_path):
total_rows += row_group.num_rows
else:
total_rows = combine_partial_results(args.tmp_path, args.catalog_path)
# pylint: disable=duplicate-code
# Very similar to /index/run_index.py
combine_partial_results(args.tmp_path, args.catalog_path)
step_progress.update(1)
catalog_info = args.to_catalog_info(0)
total_rows = int(total_rows)
catalog_info = args.to_catalog_info(total_rows)
write_metadata.write_provenance_info(
catalog_base_dir=args.catalog_path,
dataset_info=catalog_info,
Expand Down
5 changes: 5 additions & 0 deletions tests/hipscat_import/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ def parquet_shards_dir(test_data_dir):
return os.path.join(test_data_dir, "parquet_shards")


@pytest.fixture
def soap_intermediate_dir(test_data_dir):
return os.path.join(test_data_dir, "soap_intermediate")


@pytest.fixture
def parquet_shards_shard_44_0(test_data_dir):
return os.path.join(
Expand Down
2 changes: 2 additions & 0 deletions tests/hipscat_import/data/soap_intermediate/0_4.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Norder,Dir,Npix,join_Norder,join_Dir,join_Npix,num_rows
0,0,11,0,0,4,50
2 changes: 2 additions & 0 deletions tests/hipscat_import/data/soap_intermediate/1_47.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Norder,Dir,Npix,join_Norder,join_Dir,join_Npix,num_rows
0,0,11,1,0,47,2395
2 changes: 2 additions & 0 deletions tests/hipscat_import/data/soap_intermediate/2_176.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Norder,Dir,Npix,join_Norder,join_Dir,join_Npix,num_rows
0,0,11,2,0,176,385
2 changes: 2 additions & 0 deletions tests/hipscat_import/data/soap_intermediate/2_177.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Norder,Dir,Npix,join_Norder,join_Dir,join_Npix,num_rows
0,0,11,2,0,177,1510
2 changes: 2 additions & 0 deletions tests/hipscat_import/data/soap_intermediate/2_178.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Norder,Dir,Npix,join_Norder,join_Dir,join_Npix,num_rows
0,0,11,2,0,178,1634
2 changes: 2 additions & 0 deletions tests/hipscat_import/data/soap_intermediate/2_179.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Norder,Dir,Npix,join_Norder,join_Dir,join_Npix,num_rows
0,0,11,2,0,179,1773
2 changes: 2 additions & 0 deletions tests/hipscat_import/data/soap_intermediate/2_180.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Norder,Dir,Npix,join_Norder,join_Dir,join_Npix,num_rows
0,0,11,2,0,180,655
Loading

0 comments on commit b800040

Please sign in to comment.