diff --git a/doc/big_datasets.rst b/doc/big_datasets.rst index bac60e83..da5d1827 100644 --- a/doc/big_datasets.rst +++ b/doc/big_datasets.rst @@ -1,11 +1,42 @@ Handling Large Datasets ------------------------------------- -Often, one desires to use *tobac* to identify and track features in large datasets ("big data"). This documentation strives to suggest various methods for doing so efficiently. Current versions of *tobac* do not allow for out-of-memory computation, meaning that these strategies may need to be employed for both computational and memory reasons. +Often, one desires to use *tobac* to identify and track features in large datasets ("big data"). This documentation strives to suggest various methods for doing so efficiently. Current versions of *tobac* do not support out-of-core (e.g., :code:`dask`) computation, meaning that these strategies may need to be employed for both computational and memory reasons. .. _Split Feature Detection: ======================= -Split Feature Detection +Split Feature Detection and Run in Parallel ======================= -Current versions of threshold feature detection (see :doc:`feature_detection_overview`) are time independent, meaning that one can parallelize feature detection across all times (although not across space). *tobac* provides the :py:meth:`tobac.utils.combine_tobac_feats` function to combine a list of dataframes produced by a parallelization method (such as :code:`jug` or :code:`multiprocessing.pool`) into a single combined dataframe suitable to perform tracking with. +Current versions of threshold feature detection (see :doc:`feature_detection_overview`) are time independent, meaning that one can easily parallelize feature detection across all times (although not across space). *tobac* provides the :py:meth:`tobac.utils.combine_feature_dataframes` function to combine a list of dataframes produced by a parallelization method (such as :code:`jug`, :code:`multiprocessing.pool`, or :code:`dask.bag`) into a single combined dataframe suitable to perform tracking with. + +Below is a snippet from a larger notebook demonstrating how to run feature detection in parallel ( :doc:`big_datasets_examples/notebooks/parallel_processing_tobac`). + +.. code-block:: python + + # build list of tracked variables using Dask.Bag + b = dask.bag.from_sequence([combined_ds['data'][x:x+1] for x in range(len(combined_ds['time']))], npartitions=1) + out_feature_dfs = dask.bag.map(lambda x: tobac.feature_detection_multithreshold(x.to_iris(), 4000, **parameters_features), b).compute() + combined_dataframes= tobac.utils.general.combine_feature_dataframes(out_feature_dfs) + + +.. _Split Segmentation: + +====================================== +Split Segmentation and Run in Parallel +====================================== +Recall that the segmentation mask (see :doc:`segmentation_output`) is the same size as the input grid, which results in large files when handling large input datasets. The following strategies can help reduce the output size and make segmentation masks more useful for the analysis. + +The first strategy is to only segment on features *after tracking and quality control*. While this will not directly impact performance, waiting to run segmentation on the final set of features (after discarding, e.g., non-tracked cells) can make analysis of the output segmentation dataset easier. + +To enhance the speed at which segmentation runs, one can process multiple segmentation times in parallel independently, similar to feature detection. Unlike feature detection, however, there is currently no built-in *tobac* method to combine multiple segmentation times into a single file. While one can do this using typical NetCDF tools such as :code:`nccat` or with xarray utilities such as :code:`xr.concat`, you can also leave the segmentation mask output as separate files, opening them later with multiple file retrievals such as :code:`xr.open_mfdataset`. + + +.. _Tracking Hanging: + +===================================== +Tracking Hangs with too many Features +===================================== + +When tracking on a large dataset, :code:`tobac.tracking.linking_trackpy` can hang using the default parameters. This is due to the tracking library :code:`trackpy` searching for the next timestep's feature in too large of an area. This can be solved *without impact to scientific output* by lowering the :code:`subnetwork_size` parameter in :code:`tobac.tracking.linking_trackpy`. + diff --git a/doc/big_datasets_examples/index.rst b/doc/big_datasets_examples/index.rst new file mode 100644 index 00000000..51b474c2 --- /dev/null +++ b/doc/big_datasets_examples/index.rst @@ -0,0 +1,8 @@ +############################## + Big Data Processing Examples +############################## + +.. toctree:: + :maxdepth: 2 + + notebooks/parallel_processing_tobac diff --git a/doc/big_datasets_examples/notebooks/parallel_processing_tobac.ipynb b/doc/big_datasets_examples/notebooks/parallel_processing_tobac.ipynb new file mode 100644 index 00000000..a1c6825a --- /dev/null +++ b/doc/big_datasets_examples/notebooks/parallel_processing_tobac.ipynb @@ -0,0 +1,1146 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Parallel Processing of Feature Detection with `dask`" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This notebook demonstrates how to run *tobac* feature detection in parallel using the `dask` library as the parallel processor." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Imports and Dask Cluster Setup" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "%matplotlib inline" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/seanfreeman/mambaforge/envs/tobac_installed/lib/python3.11/site-packages/tobac/utils/decorators.py:366: UserWarning: Numba not able to be imported; periodic boundary calculations will be slower.Exception raised: ModuleNotFoundError(\"No module named 'numba'\")\n", + " warnings.warn(\n", + "/Users/seanfreeman/mambaforge/envs/tobac_installed/lib/python3.11/site-packages/tobac/utils/decorators.py:366: UserWarning: Numba not able to be imported; periodic boundary calculations will be slower.Exception raised: ModuleNotFoundError(\"No module named 'numba'\")\n", + " warnings.warn(\n" + ] + } + ], + "source": [ + "import tobac\n", + "import dask.bag as db\n", + "import xarray as xr\n", + "import s3fs " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "There are many different ways to initialize a dask cluster. This is just one example, running two workers on a single local machine. " + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "
\n", + "
\n", + "

Client

\n", + "

Client-5a603f42-cd1a-11ee-a16f-06803d36ca4b

\n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", + " Dashboard: http://127.0.0.1:8787/status\n", + "
\n", + "\n", + " \n", + "\n", + " \n", + "
\n", + "

Cluster Info

\n", + "
\n", + "
\n", + "
\n", + "
\n", + "

LocalCluster

\n", + "

e2f9b06d

\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + "\n", + " \n", + "
\n", + " Dashboard: http://127.0.0.1:8787/status\n", + " \n", + " Workers: 2\n", + "
\n", + " Total threads: 2\n", + " \n", + " Total memory: 64.00 GiB\n", + "
Status: runningUsing processes: True
\n", + "\n", + "
\n", + " \n", + "

Scheduler Info

\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + "

Scheduler

\n", + "

Scheduler-b2b92b52-530a-4c41-b0b5-5da2c4cfcafb

\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
\n", + " Comm: tcp://127.0.0.1:51973\n", + " \n", + " Workers: 2\n", + "
\n", + " Dashboard: http://127.0.0.1:8787/status\n", + " \n", + " Total threads: 2\n", + "
\n", + " Started: Just now\n", + " \n", + " Total memory: 64.00 GiB\n", + "
\n", + "
\n", + "
\n", + "\n", + "
\n", + " \n", + "

Workers

\n", + "
\n", + "\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 0

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:51981\n", + " \n", + " Total threads: 1\n", + "
\n", + " Dashboard: http://127.0.0.1:51982/status\n", + " \n", + " Memory: 32.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:51976\n", + "
\n", + " Local directory: /var/folders/bj/m6g82c6n41g83y3_dx02y7ch0000gp/T/dask-scratch-space/worker-pou0krcp\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 1

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:51980\n", + " \n", + " Total threads: 1\n", + "
\n", + " Dashboard: http://127.0.0.1:51983/status\n", + " \n", + " Memory: 32.00 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:51978\n", + "
\n", + " Local directory: /var/folders/bj/m6g82c6n41g83y3_dx02y7ch0000gp/T/dask-scratch-space/worker-cygts80d\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "\n", + "
\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "\n", + "
\n", + "
" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from dask.distributed import Client, progress\n", + "client = Client(n_workers=2, threads_per_worker=1)\n", + "client" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Read in Data" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Here, we are using the NOAA Global Mosaic of Geostationary Satellite Imagery (GMGSI) as our input data source from AWS s3." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "fs = s3fs.S3FileSystem(anon=True)\n", + "aws_urls = [\"s3://noaa-gmgsi-pds/GMGSI_LW/2024/01/01/00/GLOBCOMPLIR_nc.2024010100\",\n", + " \"s3://noaa-gmgsi-pds/GMGSI_LW/2024/01/01/01/GLOBCOMPLIR_nc.2024010101\"]\n", + "\n", + "all_ds = list()\n", + "for aws_url in aws_urls:\n", + " fileObj = fs.open(aws_url)\n", + " all_ds.append(xr.open_dataset(fileObj, engine='h5netcdf'))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We loaded in two files and we will use xarray to concatenate them." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "combined_ds = xr.concat(all_ds, dim='time')" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
<xarray.Dataset>\n",
+       "Dimensions:  (time: 2, yc: 3000, xc: 4999)\n",
+       "Coordinates:\n",
+       "    lat      (yc, xc) float32 72.72 72.72 72.72 72.72 ... -72.74 -72.74 -72.74\n",
+       "    lon      (yc, xc) float32 180.0 -179.9 -179.9 -179.8 ... 179.8 179.8 179.9\n",
+       "  * time     (time) datetime64[ns] 2024-01-01 2024-01-01T01:00:00\n",
+       "Dimensions without coordinates: yc, xc\n",
+       "Data variables:\n",
+       "    data     (time, yc, xc) float32 206.0 204.0 204.0 ... 187.0 188.0 182.0\n",
+       "Attributes:\n",
+       "    Conventions:          CF-1.4\n",
+       "    Source:               McIDAS Area File\n",
+       "    Satellite Sensor:     DERIVED DATA\n",
+       "    time_coverage_start:  2024-01-01T00:00:00\n",
+       "    instrument_name:      GLOBCOMPLIR\n",
+       "    history:              Mon Jan  1 00:38:21 2024: ncks -d xc,0,4998 templir...\n",
+       "    NCO:                  netCDF Operators version 4.7.5 (Homepage = http://n...
" + ], + "text/plain": [ + "\n", + "Dimensions: (time: 2, yc: 3000, xc: 4999)\n", + "Coordinates:\n", + " lat (yc, xc) float32 72.72 72.72 72.72 72.72 ... -72.74 -72.74 -72.74\n", + " lon (yc, xc) float32 180.0 -179.9 -179.9 -179.8 ... 179.8 179.8 179.9\n", + " * time (time) datetime64[ns] 2024-01-01 2024-01-01T01:00:00\n", + "Dimensions without coordinates: yc, xc\n", + "Data variables:\n", + " data (time, yc, xc) float32 206.0 204.0 204.0 ... 187.0 188.0 182.0\n", + "Attributes:\n", + " Conventions: CF-1.4\n", + " Source: McIDAS Area File\n", + " Satellite Sensor: DERIVED DATA\n", + " time_coverage_start: 2024-01-01T00:00:00\n", + " instrument_name: GLOBCOMPLIR\n", + " history: Mon Jan 1 00:38:21 2024: ncks -d xc,0,4998 templir...\n", + " NCO: netCDF Operators version 4.7.5 (Homepage = http://n..." + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "combined_ds" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "These feature detection parameters are just examples." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## *tobac* Feature Detection" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "parameters_features={}\n", + "parameters_features['position_threshold']='weighted_diff'\n", + "parameters_features['sigma_threshold']=0.5\n", + "parameters_features['n_min_threshold']=4\n", + "parameters_features['target']='minimum'\n", + "parameters_features['threshold']=[180, 170]\n", + "parameters_features['PBC_flag']='hdim_2'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "While future versions (1.6 and greater) of *tobac* will support xarray natively in feature detection and segmentation, current versions of *tobac* rely on Iris for gridded data. Because of this, we have to make some conversions to have this data be compatible with iris. " + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "# iris issues\n", + "combined_ds['data'].attrs['units'] = 'kelvin'\n", + "combined_ds['data']['time'].attrs['long_name'] ='time'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now, we will use a *dask bag* to parallelize our feature detection over time. " + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/seanfreeman/mambaforge/envs/tobac_installed/lib/python3.11/site-packages/distributed/client.py:3162: UserWarning: Sending large graph of size 57.21 MiB.\n", + "This may cause some slowdown.\n", + "Consider scattering data ahead of time and using futures.\n", + " warnings.warn(\n", + "/Users/seanfreeman/mambaforge/envs/tobac_installed/lib/python3.11/site-packages/tobac/utils/decorators.py:366: UserWarning: Numba not able to be imported; periodic boundary calculations will be slower.Exception raised: ModuleNotFoundError(\"No module named 'numba'\")\n", + " warnings.warn(\n", + "/Users/seanfreeman/mambaforge/envs/tobac_installed/lib/python3.11/site-packages/tobac/utils/decorators.py:366: UserWarning: Numba not able to be imported; periodic boundary calculations will be slower.Exception raised: ModuleNotFoundError(\"No module named 'numba'\")\n", + " warnings.warn(\n" + ] + } + ], + "source": [ + "b = db.from_sequence([combined_ds['data'][x:x+1][0:500, 0:500] for x in range(len(combined_ds['time']))], npartitions=1)\n", + "out_feature_dfs = db.map(lambda x: tobac.feature_detection_multithreshold(x.to_iris(), 4000, **parameters_features), b).compute()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Combining parallel-detected features into one coherent DataFrame" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
frameidxhdim_1hdim_2numthreshold_valuefeaturetimetimestrlatitudelongitude
0010.97532560.7181322618012024-01-01 00:00:002024-01-01 00:00:0072.694528-175.628134
1020.67001879.0748181618022024-01-01 00:00:002024-01-01 00:00:0072.701065-174.306289
2050.3581151492.9797781518032024-01-01 00:00:002024-01-01 00:00:0072.707742-72.492469
3060.4825791531.5202152618042024-01-01 00:00:002024-01-01 00:00:0072.705077-69.717214
4073.4098962113.18577028518052024-01-01 00:00:002024-01-01 00:00:0072.642292-27.832080
....................................
156511283498.18090630.0050311617015662024-01-01 01:00:002024-01-01 01:00:0058.262049-177.839756
156611284497.817803715.879216717015672024-01-01 01:00:002024-01-01 01:00:0058.275803-128.450668
156711285498.1582323295.1861716317015682024-01-01 01:00:002024-01-01 01:00:0058.26290857.282537
156811288498.4544043793.095530517015692024-01-01 01:00:002024-01-01 01:00:0058.25168293.136465
156911293498.6489113317.641704617015702024-01-01 01:00:002024-01-01 01:00:0058.24431058.899535
\n", + "

1570 rows × 11 columns

\n", + "
" + ], + "text/plain": [ + " frame idx hdim_1 hdim_2 num threshold_value feature \\\n", + "0 0 1 0.975325 60.718132 26 180 1 \n", + "1 0 2 0.670018 79.074818 16 180 2 \n", + "2 0 5 0.358115 1492.979778 15 180 3 \n", + "3 0 6 0.482579 1531.520215 26 180 4 \n", + "4 0 7 3.409896 2113.185770 285 180 5 \n", + "... ... ... ... ... ... ... ... \n", + "1565 1 1283 498.180906 30.005031 16 170 1566 \n", + "1566 1 1284 497.817803 715.879216 7 170 1567 \n", + "1567 1 1285 498.158232 3295.186171 63 170 1568 \n", + "1568 1 1288 498.454404 3793.095530 5 170 1569 \n", + "1569 1 1293 498.648911 3317.641704 6 170 1570 \n", + "\n", + " time timestr latitude longitude \n", + "0 2024-01-01 00:00:00 2024-01-01 00:00:00 72.694528 -175.628134 \n", + "1 2024-01-01 00:00:00 2024-01-01 00:00:00 72.701065 -174.306289 \n", + "2 2024-01-01 00:00:00 2024-01-01 00:00:00 72.707742 -72.492469 \n", + "3 2024-01-01 00:00:00 2024-01-01 00:00:00 72.705077 -69.717214 \n", + "4 2024-01-01 00:00:00 2024-01-01 00:00:00 72.642292 -27.832080 \n", + "... ... ... ... ... \n", + "1565 2024-01-01 01:00:00 2024-01-01 01:00:00 58.262049 -177.839756 \n", + "1566 2024-01-01 01:00:00 2024-01-01 01:00:00 58.275803 -128.450668 \n", + "1567 2024-01-01 01:00:00 2024-01-01 01:00:00 58.262908 57.282537 \n", + "1568 2024-01-01 01:00:00 2024-01-01 01:00:00 58.251682 93.136465 \n", + "1569 2024-01-01 01:00:00 2024-01-01 01:00:00 58.244310 58.899535 \n", + "\n", + "[1570 rows x 11 columns]" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "tobac.utils.general.combine_feature_dataframes(out_feature_dfs)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "tobac_installed", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.0" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}