Skip to content

Commit

Permalink
Add files via upload
Browse files Browse the repository at this point in the history
  • Loading branch information
rabernat authored Dec 27, 2024
1 parent 0bff0f6 commit 685bc36
Showing 1 changed file with 300 additions and 0 deletions.
300 changes: 300 additions & 0 deletions notebooks/build_virtual_ds_dask.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "2ac97f23-42e5-4f01-8645-18a9497c8f5d",
"metadata": {},
"source": [
"# Building a large GPM-IMERG Virtual Dataset with Dask"
]
},
{
"cell_type": "markdown",
"id": "c17e0760-b9a2-42d6-be28-cbcabaef7bb6",
"metadata": {},
"source": [
"## Define Functions"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1b99f2aa-a886-4955-b375-6a0a2ebb6721",
"metadata": {},
"outputs": [],
"source": [
"from datetime import datetime, timedelta\n",
"import pandas as pd\n",
"import coiled\n",
"from dask import compute\n",
"import dask.bag as db\n",
"import itertools"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7936d0ef-3c36-4fd8-9984-fbd24ad47259",
"metadata": {},
"outputs": [],
"source": [
"base_url = \"s3://gesdisc-cumulus-prod-protected/GPM_L3/GPM_3IMERGHH.07\"\n",
"\n",
"def make_url(date: datetime) -> str:\n",
" \"\"\"Create an S3 URL for a specific datateime\"\"\"\n",
" \n",
" end_date = date + timedelta(minutes=29, seconds=59)\n",
" base_date = datetime(year=date.year, month=date.month, day=date.day, hour=0, minute=0, second=0)\n",
" delta_minutes = (date - base_date) // timedelta(minutes=1)\n",
" components = [\n",
" base_url,\n",
" \"{:04d}\".format(date.year),\n",
" date.strftime('%j'), # day of year\n",
" (\n",
" \"3B-HHR.MS.MRG.3IMERG.\" +\n",
" date.strftime(\"%Y%m%d\") +\n",
" \"-S\" + date.strftime(\"%H%M%S\") +\n",
" \"-E\" + end_date.strftime(\"%H%M%S\") +\n",
" \".{:04d}\".format(delta_minutes) +\n",
" \".V07B.HDF5\"\n",
" )\n",
" ]\n",
" return '/'.join(components)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "754c6fc8-c131-40fc-9edf-77786312feb8",
"metadata": {},
"outputs": [],
"source": [
"def hours_for_day(day):\n",
" assert day.hour == day.minute == day.second == 0\n",
" return pd.date_range(start=day, periods=48, freq=\"30min\")\n",
"\n",
"def get_info_for_day(day):\n",
" return [get_info(make_url(full_datetime)) for full_datetime in hours_for_day(day)]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4291a146-518d-47dd-9aed-3ae4bf982461",
"metadata": {},
"outputs": [],
"source": [
"def open_virtual(url, keep_coords=True):\n",
" from virtualizarr.readers.hdf import HDFVirtualBackend\n",
" from virtualizarr import open_virtual_dataset\n",
"\n",
" drop_variables = [\"Intermediate\", \"nv\", \"lonv\", \"latv\"]\n",
" all_coords = [\"time\", \"lon\", \"lat\", \"time_bnds\", \"lon_bnds\", \"lat_bnds\"]\n",
" min_coords = [\"time\", \"time_bnds\"]\n",
"\n",
" if keep_coords:\n",
" my_drop_variables = drop_variables\n",
" loadable_variables = all_coords\n",
" my_coords = all_coords\n",
" else:\n",
" my_drop_variables = drop_variables + list(set(all_coords) - set(min_coords))\n",
" loadable_variables = min_coords\n",
" my_coords = min_coords\n",
" \n",
" ds = open_virtual_dataset(\n",
" url, indexes={}, group=\"Grid\", backend=HDFVirtualBackend,\n",
" drop_variables=my_drop_variables,\n",
" loadable_variables=loadable_variables\n",
" ).set_coords(my_coords)\n",
" return ds"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e18547b9-6272-4705-8cff-0370f448707d",
"metadata": {},
"outputs": [],
"source": [
"def reduce_via_concat(dsets):\n",
" import xarray as xr\n",
" return xr.concat(dsets, dim=\"time\", coords=\"minimal\", join=\"override\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7fd2a6a0-e4dc-49af-b23e-4f9581a8d2e4",
"metadata": {},
"outputs": [],
"source": [
"from xarray.backends.zarr import FillValueCoder\n",
"\n",
"def fix_ds(ds):\n",
" \"\"\"Fix fill-value encoding of GPM IMERG data variables\"\"\"\n",
" \n",
" ds = ds.copy()\n",
" coder = FillValueCoder()\n",
" # promote fill value to attr for zarr V3\n",
" for dvar in ds.data_vars:\n",
" dtype = ds[dvar].dtype\n",
" # this is wrong due to bug in Sean's reader\n",
" #fill_value = dtype.type(ds_concat[dvar].data.zarray.fill_value)\n",
" fill_value = dtype.type(ds[dvar].attrs['CodeMissingValue'])\n",
" encoded_fill_value = coder.encode(fill_value, dtype)\n",
" ds[dvar].attrs['_FillValue'] = encoded_fill_value\n",
" \n",
" return ds"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "32b93f63-0e61-4dd1-88a3-93bdb59e8739",
"metadata": {},
"outputs": [],
"source": [
"def dset_for_year(year):\n",
" all_days = pd.date_range(start=f\"{year}-01-01\", end=f\"{year}-12-31\", freq=\"1D\")\n",
" all_times = list(itertools.chain(*[hours_for_day(day) for day in all_days]))\n",
"\n",
" b = db.from_sequence(all_times, partition_size=48)\n",
" all_urls = db.map(make_url, b)\n",
" vdsets = db.map(open_virtual, all_urls)\n",
" concatted = vdsets.reduction(reduce_via_concat, reduce_via_concat)\n",
" ds = concatted.compute()\n",
" return fix_ds(ds)"
]
},
{
"cell_type": "markdown",
"id": "9b5bb51b-eb6b-4fff-914d-7f948556bdf2",
"metadata": {},
"source": [
"## Do Computations"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "756d52bb-85c1-47a5-9d43-1bc134a51079",
"metadata": {},
"outputs": [],
"source": [
"cluster = coiled.Cluster(\n",
" software=\"icechunk-virtualizarr\",\n",
" region=\"us-west-2\",\n",
" n_workers=100,\n",
")\n",
"cluster.send_private_envs({\"ARRAYLAKE_TOKEN\": \"***\"}) # fill in appropriately\n",
"client = cluster.get_client()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "91eec540-7d12-4caf-9f50-4d224b229c52",
"metadata": {},
"outputs": [],
"source": [
"from arraylake import Client\n",
"aclient = Client()"
]
},
{
"cell_type": "markdown",
"id": "a5a0696d-aa3d-48da-a695-3be10297c4a5",
"metadata": {},
"source": [
"### Create the repo and write the first year"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "aec6905c-466e-4d21-b3a7-e14fe0eb0777",
"metadata": {},
"outputs": [],
"source": [
"ic_repo = aclient.get_or_create_repo(\"nasa-impact/GPM_3IMERGHH.07-virtual-full\", kind=\"icechunk\")\n",
"ic_repo"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "39d6b55d-59fd-45d9-bd15-4508f1450f42",
"metadata": {},
"outputs": [],
"source": [
"ds_1998 = dset_for_year(1998)\n",
"ds_1998.virtualize.to_icechunk(ic_repo)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "87ee545e-cc98-4685-92bb-2d2c9e077eb5",
"metadata": {},
"outputs": [],
"source": [
"ic_repo.commit(\"Wrote 1998\")"
]
},
{
"cell_type": "markdown",
"id": "14408456-76c5-4717-b56d-7bed44d4b76b",
"metadata": {},
"source": [
"### Compute and Append Subsequent Years\n",
"\n",
"This starts okay, but each subsequent append takes more and more memory.\n",
"Stops working around 2009."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "40e380fd-b7c4-4dc5-9ba7-64f391d2f386",
"metadata": {},
"outputs": [],
"source": [
"for year in range(1999, 2024):\n",
" print(year)\n",
" ds_year = dset_for_year(year)\n",
" ds_year.virtualize.to_icechunk(ic_repo, append_dim=\"time\")\n",
" cid = ic_repo.commit(f\"Appended {year}\")\n",
" print(cid)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b4a85474-0499-48f4-b997-0b899dd06480",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.5"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

0 comments on commit 685bc36

Please sign in to comment.