Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import ZTF DR22 on Epyc #452

Closed
hombit opened this issue Dec 3, 2024 · 17 comments
Closed

Import ZTF DR22 on Epyc #452

hombit opened this issue Dec 3, 2024 · 17 comments
Assignees

Comments

@hombit
Copy link
Contributor

hombit commented Dec 3, 2024

Raw data is here: /data3/epyc/data3/hats/raw/ztf/lc_dr22

The result catalog should go here: /data3/epyc/data3/hats/catalogs/ztf_dr22/ztf_lc

I haven't checked checksums of the files, failures are possible.

@gitosaurus gitosaurus moved this to In Progress in HATS / LSDB Dec 3, 2024
@gitosaurus
Copy link
Contributor

Per the README, checked the checksums of the files. This operation has not yet completed, probably because I set its nice value so low (in order not to interfere):

nohup md5sum -c checksum.md5 > ~/md5-output.log &
# Process ID is 127216
renice 20 127216

At the time of this writing, it is still running, and one of the parquet files has failed its checksum so far:

wc -l checksum.md5 ~/md5-output.log
  176561 checksum.md5
  105392 /astro/users/dtj1s/md5-output.log
  281953 total

grep -v ' OK$' ~/md5-output.log
./0/field000385/ztf_000385_zr_c11_q2_dr22.parquet: FAILED

@gitosaurus
Copy link
Contributor

The import also failed with this message:

RuntimeError: Some mapping stages failed. See logs for details.

The script I used:

from dask.distributed import Client
from hats_import.pipeline import pipeline_with_client
from hats_import.catalog.arguments import ImportArguments


def main(input_path: str, output_path: str = "./output", output_artifact="test_cat"):
    args = ImportArguments(
        sort_columns="objectid",
        ra_column="objra",
        dec_column="objdec",
        input_path=input_path,
        output_artifact_name=output_artifact,
        output_path=output_path,
        file_reader="parquet",
    )
    with Client(n_workers=10, memory_limit="10GB", threads_per_worker=2) as client:
        pipeline_with_client(args, client)


if __name__ == "__main__":
    main(
        "/data3/epyc/data3/hats/raw/ztf/lc_dr22",
        "/data3/epyc/data3/hats_import/catalogs/ztf_dr22",
        "ztf_lc",
    )

The most common errors printed to the console were of this form:

Failed MAPPING stage with file /data3/epyc/data3/hats/raw/ztf/lc_dr22/1/index.html?C=N;O=D
  worker address: tcp://127.0.0.1:38628
Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.
Failed MAPPING stage with file /data3/epyc/data3/hats/raw/ztf/lc_dr22/1/index.html?C=N;O=D
2024-12-03 19:20:09,841 - distributed.worker - ERROR - Compute Failed

I will try to resume the import in a way that specifically only includes *.parquet files.

@gitosaurus
Copy link
Contributor

Verification of checksums concluded, and only one file was found that failed. Restored this file by rereading it from the source and verifying the checksum:

curl https://irsa.ipac.caltech.edu/data/ZTF/lc/lc_dr22/0/field000385/ztf_000385_zr_c11_q2_dr22.parquet -o fixme.parquet
md5sum fixme.parquet
12b316eac3768bfac393c503ff94f55e  # correct
cp fixme.parquet ./0/field000385/ztf_000385_zr_c11_q2_dr22.parquet

@gitosaurus
Copy link
Contributor

Modifying the script to use explicit glob patterns to identify the Parquet files only. Note also the change in output path, now possible due to corrected permissions.

import glob

from dask.distributed import Client
from hats_import.pipeline import pipeline_with_client
from hats_import.catalog.arguments import ImportArguments


def main(input_path: str, output_path: str = "./output", output_artifact="test_cat"):
    # Cannot use input_path directly because the input directory contains many, many
    # non-parquet files, which clog up the pipeline.  Rather, use the glob module
    # to create an authoritative and complete list of parquet files.
    print(f"Reading {input_path} for *.parquet files")
    parquet_list = glob.glob(f"{input_path}/**/*.parquet", recursive=True)
    print(f"Parquet files found: {len(parquet_list)}")
    args = ImportArguments(
        sort_columns="objectid",
        ra_column="objra",
        dec_column="objdec",
        # input_path=input_path,
        input_file_list=parquet_list,
        output_artifact_name=output_artifact,
        output_path=output_path,
        file_reader="parquet",
    )
    with Client(n_workers=10, memory_limit="10GB", threads_per_worker=2) as client:
        pipeline_with_client(args, client)


if __name__ == "__main__":
    main(
        "/data3/epyc/data3/hats/raw/ztf/lc_dr22",
        "/data3/epyc/data3/hats/catalogs/ztf_dr22",
        "ztf_lc",
    )

@gitosaurus
Copy link
Contributor

With a hint from @delucchi-cmu, created batched indexes of the Parquet files, to reduce the amount of file I/O, and to compensate for the fact that the files' size distribution is logarithmic:

def prepare_batches(input_path: str, n_batches: int, prefix: str) -> list[str]:
    """
    Given a directory to search for `.parquet` files, find them all,
    get their sizes, and sort them into n_batches files, each of which
    will contain roughly an equal number of bytes.  These batches
    will be written to a family of files that all start with `prefix`
    and are numbered.
    """
    print(f"Reading {input_path} for *.parquet files")
    parquet_list = glob.glob(f"{input_path}/**/*.parquet", recursive=True)
    print(f"Parquet files found: {len(parquet_list)}")
    file_sizes = [Path(p).stat().st_size for p in parquet_list]
    print(f"Total size of the files: {sum(file_sizes)}")
    print(f"Sorting into {n_batches} batches")
    batches = binpacking.to_constant_bin_number(
        list(zip(file_sizes, parquet_list)), n_batches, weight_pos=0
    )
    # Now write out the batch files
    batch_files = []
    for i, b in enumerate(batches):
        batch_file = Path(f"{prefix}_{i:03d}.batch")
        batch_file.write_text("\n".join([fname for sz, fname in b]) + "\n")
        batch_files.append(batch_file)
    return batch_files

def main(input_path: str, n_batches: int, output_path: str = "./output", output_artifact="test_cat"):
    batch_files = prepare_batches(input_path, n_batches, "./batch")
    args = ImportArguments(
        sort_columns="objectid",
        ra_column="objra",
        dec_column="objdec",
        # input_path=input_path,
        input_file_list=batch_files,
        output_artifact_name=output_artifact,
        output_path=output_path,
        file_reader="indexed_parquet",
    )
    # Attempt to scale splitting phase.
    with Client(n_workers=8, memory_limit="32GB", threads_per_worker=2) as client:
        pipeline_with_client(args, client)

This caused the planning, mapping, and binning phases to complete much more quickly, each phase on the order of 20 to 30 minutes. The splitting phase is still taking a long time (~8h projected) but appears to be on track to complete without errors or warnings.

@gitosaurus
Copy link
Contributor

Uh oh.

Planning  : 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:07<00:00,  1.88s/it]
Binning   : 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:11<00:00,  5.85s/it]
Splitting : 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2683/2683 [7:38:01<00:00, 10.24s/it]
Reducing  :  13%|█████████████████████████▊                                                                                                                                                                        | 1443/10839 [43:50<2:19:43,  1.12it/s]
Failed REDUCING stage for shard: 5 1168
  worker address: tcp://127.0.0.1:38087
Unexpected number of objects at pixel (Order: 5, Pixel: 1168). Expected 340518, wrote 326169
Failed REDUCING stage for shard: 5 1168
  worker address: tcp://127.0.0.1:38087
Unexpected number of objects at pixel (Order: 5, Pixel: 1168). Expected 340518, wrote 326169

Will try restarting in case it was a question of resources.

@gitosaurus
Copy link
Contributor

No such luck. Error is stable, and as such, this particular attempt appears to be at an end.

Failed REDUCING stage for shard: 5 1168
  worker address: tcp://127.0.0.1:33080
Unexpected number of objects at pixel (Order: 5, Pixel: 1168). Expected 340518, wrote 326169
Failed REDUCING stage for shard: 5 1168
  worker address: tcp://127.0.0.1:33080
Unexpected number of objects at pixel (Order: 5, Pixel: 1168). Expected 340518, wrote 326169
2024-12-09 15:43:57,157 - distributed.worker - ERROR - Compute Failed
Key:       reduce_pixel_shards-3d9b695a9bba43de503ca033ebd7a136

I have another scheme for batching the files that keeps the fields together. I had originally expected that it was only an optimization, but perhaps there is something about it that lets it get past this obstacle. This is the code for creating it:

def write_batches(parquet_file_sizes: list[tuple[int, str]]):
    total_size = sum([x[0] for x in parquet_file_sizes])
    bin_size = total_size // n_bins
    pat = re.compile(r'^.*/field(\d+)/.*$')
    field = None
    batch_count = 0
    bytes_in_bin = 0
    f_out = None

    def cycle_batch(done = False):
        nonlocal bytes_in_bin, batch_count, f_out
        if f_out is not None:
            f_out.close()
            print("Closing batch", batch_count, "containing", bytes_in_bin, "bytes")
        bytes_in_bin = 0
        batch_count += 1
        if not done:
            f_out = open(f"{prefix}_{batch_count:03d}.batch", "w")

    cycle_batch()
    while parquet_file_sizes:
        file_sz, file_name = parquet_file_sizes.pop(0)
        next_field = pat.match(file_name).group(1)
        if bytes_in_bin > bin_size and next_field != field:
            cycle_batch()
        f_out.write(file_name + '\n')
        field = next_field
        bytes_in_bin += file_sz
    cycle_batch(done=True)

@gitosaurus
Copy link
Contributor

Used the above to create index files with a limit of 1.5GiB apiece. This ran smoothly for planning and mapping, but splitting, even though it seems to be continuing, has run into this error. That batch has 128 parquet files in it, most of which are about 20MiB, and which only contains files from field 478; the files are limit_1_5G_99{5,6,7}.batch.

Failed SPLITTING stage with file limit_1_5G_996.batch
  worker address: tcp://127.0.0.1:34155
Unsupported cast from list<item: double> to double using function cast_double
2024-12-10 13:53:44,159 - distributed.worker - ERROR - Compute Failed
Key:       split_pixels-efbca3cbd64256d07453ff2ef83e6274
State:     executing
Task:  <Task 'split_pixels-efbca3cbd64256d07453ff2ef83e6274' split_pixels(, ...)>
  File "/astro/users/dtj1s/.conda/envs/dtj1s-py3.12/lib/python3.12/site-packages/hats_import/catalog/map_reduce.py", line 198, in split_pixels
    raise exception
  File "/astro/users/dtj1s/.conda/envs/dtj1s-py3.12/lib/python3.12/site-packages/hats_import/catalog/map_reduce.py", line 170, in split_pixels
    for chunk_number, data, mapped_pixels in _iterate_input_file(
                                             ^^^^^^^^^^^^^^^^^^^^
  File "/astro/users/dtj1s/.conda/envs/dtj1s-py3.12/lib/python3.12/site-packages/hats_import/catalog/map_reduce.py", line 52, in _iterate_input_file
    for chunk_number, data in enumerate(file_reader.read(input_file, read_columns=read_columns)):
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/astro/users/dtj1s/.conda/envs/dtj1s-py3.12/lib/python3.12/site-packages/hats_import/catalog/file_readers.py", line 431, in read
    for batch in input_dataset.to_batches(
                 ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/_dataset.pyx", line 3806, in _iterator
  File "pyarrow/_dataset.pyx", line 3424, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status

@gitosaurus
Copy link
Contributor

Splitting completed after 6h but then the import stopped and failed, re-reporting the error above.

Reran this, but it errored out exactly the same way on exactly the same batch, limit_1_5G_996.batch. This contains 128 files from field 478.

Reading the schema out of all of these files, 126 of them look like this:

objectid: int64
filterid: int8
fieldid: int16
rcid: int8
objra: float
objdec: float
nepochs: int64
hmjd: list<item: double>
  child 0, item: double
mag: list<item: float>
  child 0, item: float
magerr: list<item: float>
  child 0, item: float
clrcoeff: list<item: float>
  child 0, item: float
catflags: list<item: int32>
  child 0, item: int32
__index_level_0__: int64
-- schema metadata --
pandas: '{"index_columns": ["__index_level_0__"], "column_indexes": [{"na' + 1688

and two of them look like this:

objectid: int64
filterid: int8
fieldid: int16
rcid: int8
objra: float
objdec: float
nepochs: int64
hmjd: double
mag: float
magerr: float
clrcoeff: float
catflags: int32
__index_level_0__: uint32
-- schema metadata --
pandas: '{"index_columns": ["__index_level_0__"], "column_indexes": [{"na' + 1663

@gitosaurus
Copy link
Contributor

gitosaurus commented Dec 11, 2024

The only two files in that batch which do not have the list<double> schema are:

/data3/epyc/data3/hats/raw/ztf/lc_dr22/0/field000478/ztf_000478_zg_c15_q3_dr22.parquet
/data3/epyc/data3/hats/raw/ztf/lc_dr22/0/field000478/ztf_000478_zr_c15_q1_dr22.parquet

@gitosaurus
Copy link
Contributor

There are two other batches with field 478, and all of the files in those batches use list<double>.

@gitosaurus
Copy link
Contributor

Other batches that contain non-list schema:

limit_1_5G_201.batch
limit_1_5G_691.batch
limit_1_5G_935.batch
limit_1_5G_988.batch
limit_1_5G_996.batch
limit_1_5G_1000.batch
limit_1_5G_1207.batch
limit_1_5G_2231.batch
limit_1_5G_3962.batch
limit_1_5G_3970.batch

I don't know why these others didn't cause the same problem. Luck of the draw?

@delucchi-cmu
Copy link
Contributor

delucchi-cmu commented Dec 11, 2024

Found 129 empty parquet files.
empty_parquet.txt

import pyarrow.parquet as pq

from pathlib import Path

field_dir = Path("/data3/epyc/data3/hats/raw/ztf/lc_dr22/")
files = field_dir.glob("*/*/*.parquet")
num_good = 0
for file in files:
    parquet_file = pq.ParquetFile(file)
    num_rows = parquet_file.metadata.num_rows
    if num_rows == 0:
        print(file)
    else:
        num_good += 1
print("num_good", num_good)

@gitosaurus
Copy link
Contributor

Died right at the end with this:

Mapping   : 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4239/4239 [11:12<00:00,  6[50/118]
Binning   : 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2/2 [32:19<00:00, 969.75s/it]
Splitting : 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4239/4239 [6:35:56<00:00,  5.60s/it]
Reducing  : 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10839/10839 [5:23:39<00:00,  1.79s/it]
Finishing : 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [02:21<00:00, 35.26s/it]
2024-12-11 23:20:38,221 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/astro/users/dtj1s/.conda/envs/dtj1s-py3.12/lib/python3.12/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/astro/users/dtj1s/.conda/envs/dtj1s-py3.12/lib/python3.12/site-packages/distributed/worker.py", line 1269, in heartbeat
    response = await retry_operation(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/astro/users/dtj1s/.conda/envs/dtj1s-py3.12/lib/python3.12/site-packages/distributed/utils_comm.py", line 441, in retry_operation
    return await retry(
           ^^^^^^^^^^^^
  File "/astro/users/dtj1s/.conda/envs/dtj1s-py3.12/lib/python3.12/site-packages/distributed/utils_comm.py", line 420, in retry
    return await coro()
           ^^^^^^^^^^^^
  File "/astro/users/dtj1s/.conda/envs/dtj1s-py3.12/lib/python3.12/site-packages/distributed/core.py", line 1259, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/astro/users/dtj1s/.conda/envs/dtj1s-py3.12/lib/python3.12/site-packages/distributed/core.py", line 1018, in send_recv
    response = await comm.read(deserializers=deserializers)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/astro/users/dtj1s/.conda/envs/dtj1s-py3.12/lib/python3.12/site-packages/distributed/comm/tcp.py", line 236, in read
    convert_stream_closed_error(self, e)
  File "/astro/users/dtj1s/.conda/envs/dtj1s-py3.12/lib/python3.12/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:41222 remote=tcp://127.0.0.1:33443>: Stream is closed

This looked to me like an error near the end, so I simply re-ran it, expecting it to resume what wasn't done. Unfortunately it seems to have started all over again, and I don't know how to characterize what state the output file may be in.

@delucchi-cmu
Copy link
Contributor

If "Finishing" gets to 100%, then the job is finished. This error just means that dask did a bad job of saying goodbye.

@gitosaurus
Copy link
Contributor

gitosaurus commented Dec 13, 2024

Job finished, getting to 100%. Tested the output catalog:

from hats.io.validation import is_valid_catalog
output_path = "/data3/epyc/data3/hats/catalogs/ztf_dr22/"
ztf_dr22_path = output_path + "ztf_lc"
print(is_valid_catalog(ztf_dr22_path)
True

Catalog is valid.

@gitosaurus gitosaurus moved this from In Progress to Done in HATS / LSDB Dec 13, 2024
@gitosaurus gitosaurus moved this from Done to In Progress in HATS / LSDB Dec 13, 2024
@gitosaurus
Copy link
Contributor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

No branches or pull requests

3 participants