Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update parallel raster episode #113

Merged
merged 1 commit into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 60 additions & 139 deletions episodes/11-parallel-raster-computations.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: "Parallel raster computations using Dask"
teaching: 45
exercises: 25
exercises: 10
---

:::questions
Expand Down Expand Up @@ -51,12 +51,6 @@
Being able to profile the computational time is thus essential, and we will see how to do that in a Jupyter environment
in the next section.

The example that we consider here is the application of a median filter to a satellite image.
[Median filtering](https://en.wikipedia.org/wiki/Median_filter) is a common noise removal technique which
replaces a pixel's value with the median value computed from its surrounding pixels.

## Time profiling in Jupyter

:::callout

## Introduce the Data
Expand All @@ -65,7 +59,7 @@
[a previous episode](./05-access-data.md). We will load data starting from the `search.json` file.

If you would like to work with the data for this lesson without downloading data on-the-fly, you can download the
raster data using this [link](https://figshare.com/ndownloader/files/36028100). Save the `geospatial-python-raster-dataset.tar.gz`

Check warning on line 62 in episodes/11-parallel-raster-computations.md

View workflow job for this annotation

GitHub Actions / Build markdown source files if valid

[uninformative link text]: [link](https://figshare.com/ndownloader/files/36028100)
file in your current working directory, and extract the archive file by double-clicking on it or by running the
following command in your terminal `tar -zxvf geospatial-python-raster-dataset.tar.gz`. Use the file `geospatial-python-raster-dataset/search.json`
(instead of `search.json`) to get started with this lesson.
Expand All @@ -79,109 +73,27 @@
items = pystac.ItemCollection.from_file("search.json")
```

We select the last scene, and extract the URL of the true-color image ("visual"):

```python
assets = items[-1].assets # last item's assets
visual_href = assets["visual"].href # true color image
```

The true-color image is available as a raster file with 10 m resolution and 3 bands (you can verify this by opening the
file with `rioxarray`), which makes it a relatively large file (few hundreds MBs). In order to keep calculations
"manageable" (reasonable execution time and memory usage) we select here a lower resolution version of the image, taking
advantage of the so-called "pyramidal" structure of cloud-optimized GeoTIFFs (COGs). COGs, in fact, typically include
multiple lower-resolution versions of the original image, called "overviews", in the same file. This allows us to avoid
downloading high-resolution images when only quick previews are required.

Overviews are often computed using powers of 2 as down-sampling (or zoom) factors. So, typically, the first level
overview (index 0) corresponds to a zoom factor of 2, the second level overview (index 1) corresponds to a zoom factor
of 4, and so on. Here, we open the third level overview (zoom factor 8) and check that the resolution is about 80 m:
We select the last scene, extract the URLs of the red ("red") and near-infrared ("nir") bands, open them with
`rioxarray` and save them to disk:

```python
import rioxarray
visual = rioxarray.open_rasterio(visual_href, overview_level=2)
print(visual.rio.resolution())
```

```output
(79.97086671522214, -79.97086671522214)
```

Let's make sure the data has been loaded into memory before proceeding to time profile our raster calculation. Calling
the `.load()` method of a DataArray object triggers data loading:

```python
visual = visual.load()
```

Note that by default data is loaded using Numpy arrays as underlying data structure. We can visualize the raster:

```python
visual.plot.imshow(figsize=(10,10))
```

![Scene's true-color image](fig/E11/true-color-image.png){alt="true color image scene"}

Let's now apply a median filter to the image while keeping track of the execution time of this task. The filter is
carried out in two steps: first, we define the size and centering of the region around each pixel that will be
considered for the median calculation (the so-called "windows"), using the `.rolling()` method. We choose here windows
that are 7 pixel wide in both x and y dimensions, and, for each window, consider the central pixel as the window target.
We then call the `.median()` method, which initiates the construction of the windows and the actual calculation.

For the time profiling, we make use of the Jupyter magic `%%time`, which returns the time required to run the content
of a cell (note that commands starting with `%%` needs to be on the first line of the cell!):

```python
%%time
median = visual.rolling(x=7, y=7, center=True).median()
```

```output
CPU times: user 15.6 s, sys: 3.2 s, total: 18.8 s
Wall time: 19.6 s
```

Let's note down the calculation's "Wall time" (actual time to perform the task). We can inspect the image resulting
after the application of median filtering:


```python
median.plot.imshow(robust=True, figsize=(10,10))
rioxarray.open_rasterio(items[-1].assets["red"].href).rio.to_raster("red.tif", driver="COG")
rioxarray.open_rasterio(items[-1].assets["nir"].href).rio.to_raster("nir.tif", driver="COG")
```

![True-color image after median filtering](fig/E11/true-color-image_median-filter.png){alt="median filter true color image"}

:::callout

## Handling edges

By looking closely, you might notice a tiny white edge at the plot boundaries. These are the pixels that are less than
3 pixels away from the border of the image. These pixels cannot be surrounded by a 7 pixel wide window. The default
behaviour is to assign these with nodata values.
:::

Finally, let's write the data to disk:

```python
median.rio.to_raster("visual_median-filter.tif")
```

In the following section we will see how to parallelize these raster calculations, and we will compare timings to the
serial calculations that we have just run.

## Dask-powered rasters

### Chunked arrays

As we have mentioned, `rioxarray` supports the use of Dask's chunked arrays as underlying data structure. When opening
a raster file with `open_rasterio` and providing the `chunks` argument, Dask arrays are employed instead of regular
Numpy arrays. `chunks` describes the shape of the blocks which the data will be split in. As an example, we
open the blue band raster using a chunk shape of `(1, 4000, 4000)` (block size of `1` in the first dimension and
open the red band raster using a chunk shape of `(1, 4000, 4000)` (block size of `1` in the first dimension and
of `4000` in the second and third dimensions):

```python
blue_band_href = assets["blue"].href
blue_band = rioxarray.open_rasterio(blue_band_href, chunks=(1, 4000, 4000))
red = rioxarray.open_rasterio("red.tif", chunks=(1, 4000, 4000))
```

Xarray and Dask also provide a graphical representation of the raster data array and of its blocked structure.
Expand All @@ -192,29 +104,27 @@

### Exercise: Chunk sizes matter

We have already seen how COGs are regular GeoTIFF files with a special internal structure. other feature of COGs is
that data is organized in "blocks" that can be accessed remotely via independent HTTP requests, abling partial file
readings. This is useful if you want to access only a portion of your raster file, but it also lows for efficient
parallel reading. You can check the blocksize employed in a COG file with the following code ippet:
We have already seen how COGs are regular GeoTIFF files with a special internal structure. Another feature of COGs is
that data is organized in blocks that can be accessed remotely via independent HTTP requests, enabling partial file
readings. This is useful if you want to access only a portion of your raster file, but it also allows for efficient
parallel reading. You can check the blocksize employed in a COG file with the following code snippet:

```python
import rasterio
with rasterio.open(visual_href) as r:
with rasterio.open("/path/or/URL/to/file.tif") as r:
if r.is_tiled:
print(f"Chunk size: {r.block_shapes}")
```

In order to optimally access COGs it is best to align the blocksize of the file with the chunks ployed when loading
the file. Open the blue-band asset (the "blue" asset) of a Sentinel-2 scene as a chunked `DataArray` object using a suitable
chunk size. Which elements do you think should be considered when choosing the chunk size?
In order to optimally access COGs it is best to align the blocksize of the file with the chunks enployed when loading
the file. Which other elements do you think should be considered when choosing the chunk size? What do you think are suitable chunk sizes for the red band raster?

::::solution

### Solution

```python
import rasterio
with rasterio.open(blue_band_href) as r:
with rasterio.open("red.tif") as r:
if r.is_tiled:
print(f"Chunk size: {r.block_shapes}")
```
Expand All @@ -223,22 +133,22 @@
Chunk size: [(1024, 1024)]
```

Ideal chunk size values for this raster are thus multiples of 1024. An element to consider is number of
Ideal chunk size values for this raster are multiples of 1024. An element to consider is the number of
resulting chunks and their size. While the optimal chunk size strongly depends on the specific application, chunks
should in general not be too big nor too small (i.e. too many). As a rule of thumb, chunk sizes of 100 MB typically
work well with Dask (see, e.g., this [blog post](https://blog.dask.org/2021/11/02/choosing-dask-chunk-sizes)). Also,
the shape might be relevant, depending on the application! Here, we might select a chunks shape of
`(1, 6144, 6144)`::

```python
band = rioxarray.open_rasterio(blue_band_href, chunks=(1, 6144, 6144))
red = rioxarray.open_rasterio("red.tif", chunks=(1, 6144, 6144))
```

which leads to chunks 72 MB large: ((1 x 6144 x 6144) x 2 bytes / 2^20 = 72 MB). Also, we can t ioxarray` and Dask
which leads to chunks 72 MB large: ((1 x 6144 x 6144) x 2 bytes / 2^20 = 72 MB). Also, we can let rioxarray and Dask
figure out appropriate chunk shapes by setting `chunks="auto"`:

```python
band = rioxarray.open_rasterio(blue_band_href, chunks="auto")
red = rioxarray.open_rasterio("red.tif", chunks="auto")
```

which leads to `(1, 8192, 8192)` chunks (128 MB).
Expand All @@ -252,41 +162,54 @@
coordinates how the operations should be executed on the individual chunks of data, and runs these tasks in parallel as
much as possible.

Let's now repeat the raster calculations that we have carried out in the previous section, but running calculations in
parallel over a multi-core CPU. We first open the relevant rasters as chunked arrays:
Let us set up an example where we calculate the NDVI for a full Sentinel-2 tile, and try to estimate the performance gain by running the calculation in parallel on a multi-core CPU.

To run the calculation serially, we open the relevant raster bands, as we have learned in the previous episodes:

```python
visual_dask = rioxarray.open_rasterio(visual_href, overview_level=2, lock=False, chunks=(3, 500, 500))
red = rioxarray.open_rasterio('red.tif', masked=True)
nir = rioxarray.open_rasterio('nir.tif', masked=True)
```

Setting `lock=False` tells `rioxarray` that the individual data chunks can be loaded simultaneously from the source by
the Dask workers.
We then compute the NDVI. Note the Jupyter magic `%%time`, which returns the time required to run the content of a cell (note that commands starting with `%%` needs to be on the first line of the cell!):

```python
%%time
ndvi = (nir - red)/(nir + red)
```

```output
CPU times: user 4.99 s, sys: 3.44 s, total: 8.43 s
Wall time: 8.53 s
```

As the next step, we trigger the download of the data using the `.persist()` method, see below. This makes sure that
the downloaded chunks are stored in the form of a chunked Dask array (calling `.load()` would instead merge the chunks
in a single Numpy array).
We note down the calculation's "Wall time" (actual time to perform the task).

We explicitly tell Dask to parallelize the required workload over 4 threads:
Now we run the same task in parallel using Dask. To do so, we open the relevant rasters as chunked arrays:

```python
visual_dask = visual_dask.persist(scheduler="threads", num_workers=4)
red_dask = rioxarray.open_rasterio('red.tif', masked=True, lock=False, chunks=(1, 6144, 6144))
nir_dask = rioxarray.open_rasterio('nir.tif', masked=True, lock=False, chunks=(1, 6144, 6144))
```

Let's now continue to the actual calculation. Note how the same syntax as for its serial version is employed for
applying the median filter. Don't forget to add the Jupyter magic to record the timing!
Setting `lock=False` tells `rioxarray` that the individual data chunks can be loaded simultaneously from the source by
the Dask workers.

We now continue to the actual calculation: Note how the same syntax as for its serial version is employed for
computing the NDVI. Don’t forget to add the Jupyter magic to record the timing!

```python
%%time
median_dask = visual_dask.rolling(x=7, y=7, center=True).median()
ndvi_dask = (nir_dask - red_dask)/(nir_dask + red_dask)
```

```output
CPU times: user 20.6 ms, sys: 3.71 ms, total: 24.3 ms
Wall time: 25.2 ms
CPU times: user 7.71 ms, sys: 1.71 ms, total: 9.42 ms
Wall time: 8.61 ms
```

Did we just observe a 700x speed-up when comparing to the serial calculation (19.6 s vs 25.2 ms)? Actually, no
calculation has run yet. This is because operations performed on Dask arrays are executed "lazily", i.e. they are not
Did we just observe a 1000x speed-up when comparing to the serial calculation (~8 s vs ~8 ms)? Actually, no
calculation has run yet. This is because operations performed on Dask arrays are executed lazily, i.e. they are not
immediately run.

:::callout
Expand All @@ -297,53 +220,51 @@

```python
import dask
dask.visualize(median_dask)
dask.visualize(ndvi_dask)
```

![Dask graph](fig/E11/dask-graph.png){alt="dask graph"}

The task graph gives Dask the complete "overview" of the calculation, thus enabling a better nagement of tasks and
The task graph gives Dask the complete "overview" of the calculation, thus enabling a better management of tasks and
resources when dispatching calculations to be run in parallel.
:::

Most methods of `DataArray`'s run operations lazily when Dask arrays are employed. In order to trigger calculations, we
can use either `.persist()` or `.compute()`. The former keeps data in the form of chunked Dask arrays, and it should
thus be used to run intermediate steps that will be followed by additional calculations. The latter merges instead the
chunks in a single Numpy array, and it should be used at the very end of a sequence of calculations. Both methods
accept the same parameters (here, we again explicitly tell Dask to run tasks on 4 threads). Let's again time the cell
accept the same parameters. Here, we explicitly tell Dask to parallelize the required workload over 4 threads. Let's again time the cell
execution:

```python
%%time
median_dask = median_dask.persist(scheduler="threads", num_workers=4)
ndvi_dask = ndvi_dask.persist(scheduler="threads", num_workers=4)
```

```output
CPU times: user 19.1 s, sys: 3.2 s, total: 22.3 s
Wall time: 6.61 s
CPU times: user 4.18 s, sys: 2.19 s, total: 6.37 s
Wall time: 2.32 s
```

The timing that we have recorded makes much more sense now. When running the task on a 4-core CPU laptop, we observe a
x3 speed-up when comparing to the analogous serial calculation (19.6 s vs 6.61 s).
x3.6 speed-up when comparing to the analogous serial calculation (8.53 s vs 2.32 s).

Once again, we stress that one does not always obtain similar performance gains by exploiting the Dask-based
parallelization. Even if the algorithm employed is well suited for parallelization, Dask introduces some overhead time
to manage the tasks in the Dask graph. This overhead, which is typically of the order of few milliseconds per task, can
be larger than the parallelization gain. This is the typical situation with calculations with many small chunks.

Finally, let's have a look at how Dask can be used to save raster files. When calling `.to_raster()`, we provide the
following additional arguments:
* `tiled=True`: write raster as a chunked GeoTIFF.
* `lock=threading.Lock()`: the threads which are splitting the workload must "synchronise" when writing to the same file
(they might otherwise overwrite each other's output).
as additional argument `lock=threading.Lock()`. This is because the threads which are splitting the workload must
"synchronise" when writing to the same file (they might otherwise overwrite each other's output).

```python
from threading import Lock
median_dask.rio.to_raster("visual_median-filter_dask.tif", tiled=True, lock=Lock())
ndvi_dask.rio.to_raster('ndvi.tif', driver='COG', lock=Lock())
```

Note that `.to_raster()` is among the methods that trigger immediate calculations (one can change this behaviour by
specifying `compute=False`)
specifying `compute=False`).

:::keypoints
- The `%%time` Jupyter magic command can be used to profile calculations.
Expand Down
Binary file modified episodes/fig/E11/dask-graph.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed episodes/fig/E11/true-color-image.png
Binary file not shown.
Binary file removed episodes/fig/E11/true-color-image_median-filter.png
Binary file not shown.
Loading