-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Icechunk stores design doc #1
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,208 @@ | ||
# GPM IMERG Icechunk Dataset Design Document | ||
|
||
This document describes the structure of the NASA GPM IMERG dataset and the Icechunk datasets we will be creating from this dataset. | ||
|
||
Goals: | ||
- Explore workflows for creating large Icechunk virtual datasets | ||
- Expose and resolve scalability bottlenecks withing Icechunk | ||
- Understand the performance of both writing and reading Icechunk stores | ||
- Compare virtual store read performance with rechunked native store read performance | ||
|
||
## About the Dataset | ||
|
||
Official Name: **GPM IMERG Final Precipitation L3 Half Hourly 0.1 degree x 0.1 degree V07 (GPM_3IMERGHH) at GES DISC** | ||
|
||
Official NASA Website: https://data.nasa.gov/dataset/GPM-IMERG-Final-Precipitation-L3-Half-Hourly-0-1-d/hqn4-tpfu/about_data | ||
|
||
S3 Bucket: `s3://gesdisc-cumulus-prod-protected/GPM_L3/GPM_3IMERGHH.07/` | ||
|
||
|
||
### Granules | ||
|
||
There is one NetCDF4 file produced at every 30-minute interval. The files are named like this: | ||
|
||
`s3://gesdisc-cumulus-prod-protected/GPM_L3/GPM_3IMERGHH.07/1998/001/3B-HHR.MS.MRG.3IMERG.19980101-S000000-E002959.0000.V07B.HDF5` | ||
|
||
|
||
<details> | ||
<summary>Python Function to compute the URL for any Python datetime</summary> | ||
|
||
```python | ||
from datetime import datetime, timedelta | ||
|
||
base_url = "s3://gesdisc-cumulus-prod-protected/GPM_L3/GPM_3IMERGHH.07" | ||
|
||
def make_url(date: datetime) -> str: | ||
end_date = date + timedelta(minutes=29, seconds=59) | ||
base_date = datetime(year=date.year, month=date.month, day=date.day, hour=0, minute=0, second=0) | ||
delta_minutes = (date - base_date) // timedelta(minutes=1) | ||
components = [ | ||
base_url, | ||
"{:04d}".format(date.year), | ||
date.strftime('%j'), # day of year | ||
( | ||
"3B-HHR.MS.MRG.3IMERG." + | ||
date.strftime("%Y%m%d") + | ||
"-S" + date.strftime("%H%M%S") + | ||
"-E" + end_date.strftime("%H%M%S") + | ||
".{:04d}".format(delta_minutes) + | ||
".V07B.HDF5" | ||
) | ||
] | ||
return '/'.join(components) | ||
``` | ||
|
||
</details> | ||
|
||
There are 48 files per day. | ||
The dataset begins on Jan. 1, 1998. Through the end of 2024, this amounts to 473,328 files. | ||
|
||
### Internal File Structure | ||
|
||
The files are HDF5-backed NetCDF4 datasets with two groups: | ||
- `Grid` - this seems to be the primary dataset | ||
- `Grid/Intermediate` - some other variables. :point_left: _We will be ignoring this group for now._ | ||
|
||
Here are the contents of the `Grid` group | ||
|
||
| name | dtype | shape | chunk shape | num. chunks | | ||
|--|--|--|--|--| | ||
| nv | int32 | (2,) | (2,) | 1 | | ||
| lonv | int32 | (2,) | (2,) | 1 | | ||
| latv | int32 | (2,) | (2,) | 1 | | ||
| time | int32 | (1,) | (32,) | 0 | | ||
| lon | float32 | (3600,) | (3600,) | 1 | | ||
| lat | float32 | (1800,) | (1800,) | 1 | | ||
| time_bnds | int32 | (1, 2) | (32, 2) | 0 | | ||
| lon_bnds | float32 | (3600, 2) | (3600, 2) | 1 | | ||
| lat_bnds | float32 | (1800, 2) | (1800, 2) | 1 | | ||
| precipitation | float32 | (1, 3600, 1800) | (1, 145, 1800) | 24 | | ||
| randomError | float32 | (1, 3600, 1800) | (1, 145, 1800) | 24 | | ||
| probabilityLiquidPrecipitation | int16 | (1, 3600, 1800) | (1, 291, 1800) | 12 | | ||
| precipitationQualityIndex | float32 | (1, 3600, 1800) | (1, 145, 1800) | 24 | | ||
|
||
The primary data variables are `precipitation`, `randomError`, `probabilityLiquidPrecipitation`, and `precipitationQualityIndex`. | ||
These variables contain **84 chunks**. | ||
These will be the variables in our virtual dataset. | ||
|
||
## Overall Approach and Sequencing | ||
|
||
We will develop these datasets in the following sequence | ||
|
||
- [x] Small-scale (single day) virtual dataset | ||
- [ ] `IN-PROGRESS` Batch-processed virtual dataset of the entire record. | ||
- [ ] `TODO` Incremental appending to virtual Icechunk dataset | ||
- [ ] `TODO` Batch rechunking from virtual to native Icechunk | ||
|
||
### Small-scale (single day) virtual dataset | ||
|
||
This step allows us to test and verify the basic functionality before moving to the larger-scale data processing problem. | ||
- https://github.com/earth-mover/icechunk-nasa/blob/main/notebooks/write_virtual.ipynb | ||
- https://github.com/earth-mover/icechunk-nasa/blob/main/notebooks/read_virtual.ipynb | ||
|
||
This step revealed several issues in VirtualiZarr: | ||
|
||
- https://github.com/zarr-developers/VirtualiZarr/issues/336 (fixed) | ||
- https://github.com/zarr-developers/VirtualiZarr/issues/341 | ||
- https://github.com/zarr-developers/VirtualiZarr/issues/342 | ||
- https://github.com/zarr-developers/VirtualiZarr/issues/343 | ||
|
||
Mostly we were able to move forward by using the `HDFVirtualBackend`. | ||
|
||
<details> | ||
<summary>Function to fix fill_value encoding</summary> | ||
|
||
```python | ||
from xarray.backends.zarr import FillValueCoder | ||
|
||
def fix_ds(ds): | ||
ds = ds.copy() | ||
coder = FillValueCoder() | ||
# promote fill value to attr for zarr V3 | ||
for dvar in ds.data_vars: | ||
dtype = ds[dvar].dtype | ||
# this is wrong due to bug in Sean's reader | ||
#fill_value = dtype.type(ds_concat[dvar].data.zarray.fill_value) | ||
fill_value = dtype.type(ds[dvar].attrs['CodeMissingValue']) | ||
encoded_fill_value = coder.encode(fill_value, dtype) | ||
ds[dvar].attrs['_FillValue'] = encoded_fill_value | ||
``` | ||
|
||
</details> | ||
|
||
### Batch-processed virtual dataset of the entire record | ||
|
||
This step allows us to probe the scalability of virtual reference generation and storage in Icechunk. | ||
It involves building a single ARCO data cube from the entire 26-year GPM IMERG record. | ||
The final dataset will have 39,759,552 virtual chunks. | ||
|
||
We are currently using `Dask` to parallelize the reading of the files and generation of references. Full notebook [here](https://github.com/earth-mover/icechunk-nasa/blob/main/notebooks/build_virtual_ds_dask.ipynb. | ||
The relevant code looks something like this: | ||
|
||
```python | ||
import dask.bag as db | ||
import xarray as xr | ||
|
||
def reduce_via_concat(dsets): | ||
return xr.concat(dsets, dim="time", coords="minimal", join="override") | ||
|
||
b = db.from_sequence(all_times, partition_size=48) | ||
all_urls = db.map(make_url, b) | ||
vdsets = db.map(open_virtual, all_urls) | ||
concatted = vdsets.reduction(reduce_via_concat, reduce_via_concat) | ||
``` | ||
Comment on lines
+142
to
+153
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI my approach to this is to try and use lithops to parallelize the See zarr-developers/VirtualiZarr#349, and I also have a notebook using this that I need to publish. |
||
|
||
The `reduction` step allows us to iteratively combine the virtual datasets as they are read. | ||
|
||
In the first attempt, we tried to process all of the files in one go. | ||
Dask workers appeared to be running out of memory. | ||
So we switched to the approach of processing one year at a time. In pseudocode: | ||
|
||
```python | ||
# special case for first year | ||
ds_1998 = dset_for_year(1998) | ||
ds_1998.virtualize.to_icechunk(ic_repo) | ||
|
||
for year in range(1999, 2024): | ||
ds_year = dset_for_year(year) | ||
ds_year.virtualize.to_icechunk(ic_repo, append_dim="time") | ||
cid = ic_repo.commit(f"Appended {year}") | ||
``` | ||
|
||
We were able to create about 10 years of data this way. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🙌🏽 |
||
Each subsequent commit required more and more memory, until we eventually ran out of memory. | ||
|
||
When opening this store and reading back data, we observed two important performance bottlenecks: | ||
- Calling `group.members()` is very slow, causing `xr.open_dataset` to be slow. According to @dcherian, | ||
> yes this is known, `list_dir` is inefficient because it is `list_prefix` with post-filtering. | ||
|
||
We will be fixing this soon. | ||
- Reading any data from arrays (even small ones) is very slow and memory intensive. This is because it requires downloading and loading the entire chunk manifest for the entire dataset. | ||
Comment on lines
+175
to
+180
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is effectively a summary on where IC needs work to be able to support this use case better. The relevant IC issues are:
I'm not going to work further on this until we have made progress on those issues. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alternatively, we could move forward with demos using smaller-scale virtual datasets. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I would love to understand this better - I guess specifically how looking up chunk indices to byte ranges + file names are stored in icechunk and read by zarr. Are all chunk references stored together? Is it possible to load just the chunk references that are required for a specific query? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see these questions are somewhat answered below - so it is my understanding now that all chunk references are stored together and that option 3 (manifest sharding) would be one solution which enables loading select chunk references. |
||
|
||
|
||
The size of the chunk manifest appears to be a critical issue at this point. | ||
For 11 years of data (1998-2008 inclusive), the repo contains 16,972,032 chunks. | ||
This is less than half the total expected number of chunks. | ||
The manifest for this snapshot is 3,086,327,626 bytes (> 3 GB). | ||
|
||
Downloading and loading this huge manifest is consuming lots of time and memory. | ||
This also impacts appending, since the dataset must be opened and read by Xarray in order to append. | ||
Ideally, opening the Xarray dataset should _not have any performance characteristics that scale with size of the data variables_. | ||
|
||
There are several strategies we could explore to mitigate these issues: | ||
- **Better compression of manifest data.** Our current msgpack format does not use any compression whatsoever. Compressing the manifests will make them faster to download. | ||
- **Concurrent downloading of manifests.** For a 3 GB manifest, splitting the download over many threads will speed it up a lot. (This optimization applies to any file in Icechunk, including chunks.) | ||
- **Manifest sharding.** We can't allow manifests to grow without bound. The Icechunk Spec allows multiple manifests. The question is how do we split them up. This question merits a design doc all of its own. But here a couple of ideas: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm no Icechunk expert but this seems like the best option - let's discuss further. |
||
- Define maximum manifest size (e.g. 100 MB) | ||
- Split manifests by arrays | ||
- Split manifest by chunk index | ||
- Put all of the coordinates in a separate manifest. (Challenge: requires icechunk to "know" what coordinates are. This reaches across several layers of abstraction (Xarray data model -> Zarr data model -> Zarr Store Interface -> Icechunk). | ||
|
||
### Incremental appending to virtual Icechunk dataset | ||
|
||
We have already begun to explore appending in the previous item. | ||
Making appending work well is contingent on resolving the manifest scalability issues described above. | ||
|
||
### Batch rechunking from virtual to native Icechunk | ||
|
||
We will work on this after we have a good reference virtual dataset to start from. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is also https://disc.gsfc.nasa.gov/datasets/GPM_3IMERGHHE_07/summary which I prefer as it has more information and links to the official documentation (apologies if you knew this already). Specifically, the linked technical documentation describes the data variables and that the introduction of the
Intermediate
group was to "minimize misinterpretation of variable names and reflect changes in the algorithm".