diff --git a/notebooks/build_virtual_ds_dask.ipynb b/notebooks/build_virtual_ds_dask.ipynb new file mode 100644 index 0000000..f11cf73 --- /dev/null +++ b/notebooks/build_virtual_ds_dask.ipynb @@ -0,0 +1,300 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "2ac97f23-42e5-4f01-8645-18a9497c8f5d", + "metadata": {}, + "source": [ + "# Building a large GPM-IMERG Virtual Dataset with Dask" + ] + }, + { + "cell_type": "markdown", + "id": "c17e0760-b9a2-42d6-be28-cbcabaef7bb6", + "metadata": {}, + "source": [ + "## Define Functions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1b99f2aa-a886-4955-b375-6a0a2ebb6721", + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import datetime, timedelta\n", + "import pandas as pd\n", + "import coiled\n", + "from dask import compute\n", + "import dask.bag as db\n", + "import itertools" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7936d0ef-3c36-4fd8-9984-fbd24ad47259", + "metadata": {}, + "outputs": [], + "source": [ + "base_url = \"s3://gesdisc-cumulus-prod-protected/GPM_L3/GPM_3IMERGHH.07\"\n", + "\n", + "def make_url(date: datetime) -> str:\n", + " \"\"\"Create an S3 URL for a specific datateime\"\"\"\n", + " \n", + " end_date = date + timedelta(minutes=29, seconds=59)\n", + " base_date = datetime(year=date.year, month=date.month, day=date.day, hour=0, minute=0, second=0)\n", + " delta_minutes = (date - base_date) // timedelta(minutes=1)\n", + " components = [\n", + " base_url,\n", + " \"{:04d}\".format(date.year),\n", + " date.strftime('%j'), # day of year\n", + " (\n", + " \"3B-HHR.MS.MRG.3IMERG.\" +\n", + " date.strftime(\"%Y%m%d\") +\n", + " \"-S\" + date.strftime(\"%H%M%S\") +\n", + " \"-E\" + end_date.strftime(\"%H%M%S\") +\n", + " \".{:04d}\".format(delta_minutes) +\n", + " \".V07B.HDF5\"\n", + " )\n", + " ]\n", + " return '/'.join(components)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "754c6fc8-c131-40fc-9edf-77786312feb8", + "metadata": {}, + "outputs": [], + "source": [ + "def hours_for_day(day):\n", + " assert day.hour == day.minute == day.second == 0\n", + " return pd.date_range(start=day, periods=48, freq=\"30min\")\n", + "\n", + "def get_info_for_day(day):\n", + " return [get_info(make_url(full_datetime)) for full_datetime in hours_for_day(day)]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4291a146-518d-47dd-9aed-3ae4bf982461", + "metadata": {}, + "outputs": [], + "source": [ + "def open_virtual(url, keep_coords=True):\n", + " from virtualizarr.readers.hdf import HDFVirtualBackend\n", + " from virtualizarr import open_virtual_dataset\n", + "\n", + " drop_variables = [\"Intermediate\", \"nv\", \"lonv\", \"latv\"]\n", + " all_coords = [\"time\", \"lon\", \"lat\", \"time_bnds\", \"lon_bnds\", \"lat_bnds\"]\n", + " min_coords = [\"time\", \"time_bnds\"]\n", + "\n", + " if keep_coords:\n", + " my_drop_variables = drop_variables\n", + " loadable_variables = all_coords\n", + " my_coords = all_coords\n", + " else:\n", + " my_drop_variables = drop_variables + list(set(all_coords) - set(min_coords))\n", + " loadable_variables = min_coords\n", + " my_coords = min_coords\n", + " \n", + " ds = open_virtual_dataset(\n", + " url, indexes={}, group=\"Grid\", backend=HDFVirtualBackend,\n", + " drop_variables=my_drop_variables,\n", + " loadable_variables=loadable_variables\n", + " ).set_coords(my_coords)\n", + " return ds" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e18547b9-6272-4705-8cff-0370f448707d", + "metadata": {}, + "outputs": [], + "source": [ + "def reduce_via_concat(dsets):\n", + " import xarray as xr\n", + " return xr.concat(dsets, dim=\"time\", coords=\"minimal\", join=\"override\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7fd2a6a0-e4dc-49af-b23e-4f9581a8d2e4", + "metadata": {}, + "outputs": [], + "source": [ + "from xarray.backends.zarr import FillValueCoder\n", + "\n", + "def fix_ds(ds):\n", + " \"\"\"Fix fill-value encoding of GPM IMERG data variables\"\"\"\n", + " \n", + " ds = ds.copy()\n", + " coder = FillValueCoder()\n", + " # promote fill value to attr for zarr V3\n", + " for dvar in ds.data_vars:\n", + " dtype = ds[dvar].dtype\n", + " # this is wrong due to bug in Sean's reader\n", + " #fill_value = dtype.type(ds_concat[dvar].data.zarray.fill_value)\n", + " fill_value = dtype.type(ds[dvar].attrs['CodeMissingValue'])\n", + " encoded_fill_value = coder.encode(fill_value, dtype)\n", + " ds[dvar].attrs['_FillValue'] = encoded_fill_value\n", + " \n", + " return ds" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "32b93f63-0e61-4dd1-88a3-93bdb59e8739", + "metadata": {}, + "outputs": [], + "source": [ + "def dset_for_year(year):\n", + " all_days = pd.date_range(start=f\"{year}-01-01\", end=f\"{year}-12-31\", freq=\"1D\")\n", + " all_times = list(itertools.chain(*[hours_for_day(day) for day in all_days]))\n", + "\n", + " b = db.from_sequence(all_times, partition_size=48)\n", + " all_urls = db.map(make_url, b)\n", + " vdsets = db.map(open_virtual, all_urls)\n", + " concatted = vdsets.reduction(reduce_via_concat, reduce_via_concat)\n", + " ds = concatted.compute()\n", + " return fix_ds(ds)" + ] + }, + { + "cell_type": "markdown", + "id": "9b5bb51b-eb6b-4fff-914d-7f948556bdf2", + "metadata": {}, + "source": [ + "## Do Computations" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "756d52bb-85c1-47a5-9d43-1bc134a51079", + "metadata": {}, + "outputs": [], + "source": [ + "cluster = coiled.Cluster(\n", + " software=\"icechunk-virtualizarr\",\n", + " region=\"us-west-2\",\n", + " n_workers=100,\n", + ")\n", + "cluster.send_private_envs({\"ARRAYLAKE_TOKEN\": \"***\"}) # fill in appropriately\n", + "client = cluster.get_client()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "91eec540-7d12-4caf-9f50-4d224b229c52", + "metadata": {}, + "outputs": [], + "source": [ + "from arraylake import Client\n", + "aclient = Client()" + ] + }, + { + "cell_type": "markdown", + "id": "a5a0696d-aa3d-48da-a695-3be10297c4a5", + "metadata": {}, + "source": [ + "### Create the repo and write the first year" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aec6905c-466e-4d21-b3a7-e14fe0eb0777", + "metadata": {}, + "outputs": [], + "source": [ + "ic_repo = aclient.get_or_create_repo(\"nasa-impact/GPM_3IMERGHH.07-virtual-full\", kind=\"icechunk\")\n", + "ic_repo" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "39d6b55d-59fd-45d9-bd15-4508f1450f42", + "metadata": {}, + "outputs": [], + "source": [ + "ds_1998 = dset_for_year(1998)\n", + "ds_1998.virtualize.to_icechunk(ic_repo)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "87ee545e-cc98-4685-92bb-2d2c9e077eb5", + "metadata": {}, + "outputs": [], + "source": [ + "ic_repo.commit(\"Wrote 1998\")" + ] + }, + { + "cell_type": "markdown", + "id": "14408456-76c5-4717-b56d-7bed44d4b76b", + "metadata": {}, + "source": [ + "### Compute and Append Subsequent Years\n", + "\n", + "This starts okay, but each subsequent append takes more and more memory.\n", + "Stops working around 2009." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "40e380fd-b7c4-4dc5-9ba7-64f391d2f386", + "metadata": {}, + "outputs": [], + "source": [ + "for year in range(1999, 2024):\n", + " print(year)\n", + " ds_year = dset_for_year(year)\n", + " ds_year.virtualize.to_icechunk(ic_repo, append_dim=\"time\")\n", + " cid = ic_repo.commit(f\"Appended {year}\")\n", + " print(cid)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b4a85474-0499-48f4-b997-0b899dd06480", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}