Skip to content

Commit

Permalink
Merge pull request #64 from ec-jrc/development
Browse files Browse the repository at this point in the history
New tool catchstats and updates to ncextract tool
  • Loading branch information
doc78 authored May 8, 2024
2 parents 55e9e0d + d4c5a1b commit 506b693
Show file tree
Hide file tree
Showing 22 changed files with 713 additions and 340 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,6 @@ tests/data/gridding/meteo_out/**/*.nc
tests/data/gridding/meteo_out/**/*.tiff
.settings
.project
.pydevproject
.pydevproject

.ipynb_checkpoints/
298 changes: 208 additions & 90 deletions README.md

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions bin/catchstats
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!python

import os
import sys

current_dir = os.path.dirname(os.path.abspath(__file__))
src_dir = os.path.normpath(os.path.join(current_dir, '../src/'))
if os.path.exists(src_dir):
sys.path.append(src_dir)

from lisfloodutilities.catchstats import main_script

if __name__ == '__main__':
main_script()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
attrs>=19.3.0
cffi>=1.15.1
cfgrib>=0.9.11.0
cftime>=1.0.4.2
cloudpickle>=2.2.1
coverage>=6.0
Expand Down
7 changes: 5 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ def run(self):
'nc2pcr: Convert netCDF files ot PCRaster format; '
'cutmaps: cut netCDF files; '
'compare: compare two set of netCDF files; '
'ncextract: extract values from netCDF files; ',
'thresholds: compute discharge return period thresholds; '
'gridding: interpolate meteo variables observations; '
'ncextract: extract values from netCDF files; '
'catchstats: calculates catchment statistics; ',
long_description=long_description,
long_description_content_type='text/markdown',
setup_requires=[
Expand All @@ -145,7 +148,7 @@ def run(self):
keywords=['netCDF4', 'PCRaster', 'mapstack', 'lisflood', 'efas', 'glofas', 'ecmwf', 'copernicus'],
license='EUPL 1.2',
url='https://github.com/ec-jrc/lisflood-utilities',
scripts=['bin/pcr2nc', 'bin/cutmaps', 'bin/compare', 'bin/nc2pcr', 'bin/thresholds', 'bin/gridding', 'bin/cddmap', 'bin/ncextract',],
scripts=['bin/pcr2nc', 'bin/cutmaps', 'bin/compare', 'bin/nc2pcr', 'bin/thresholds', 'bin/gridding', 'bin/cddmap', 'bin/ncextract','bin/catchstats',],
zip_safe=True,
classifiers=[
# complete classifier list: http://pypi.python.org/pypi?%3Aaction=list_classifiers
Expand Down
18 changes: 18 additions & 0 deletions src/lisfloodutilities/catchstats/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""
Copyright 2019-2023 European Union
Licensed under the EUPL, Version 1.2 or as soon they will be approved by the European Commission subsequent versions of the EUPL (the "Licence");
You may not use this work except in compliance with the Licence.
You may obtain a copy of the Licence at:
https://joinup.ec.europa.eu/sites/default/files/inline-files/EUPL%20v1_2%20EN(1).txt
Unless required by applicable law or agreed to in writing, software distributed under the Licence is distributed on an "AS IS" basis,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the Licence for the specific language governing permissions and limitations under the Licence.
"""

from .catchstats import *
272 changes: 272 additions & 0 deletions src/lisfloodutilities/catchstats/catchstats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
"""
Copyright 2019-2023 European Union
Licensed under the EUPL, Version 1.2 or as soon they will be approved by the European Commission subsequent versions of the EUPL (the "Licence");
You may not use this work except in compliance with the Licence.
You may obtain a copy of the Licence at:
https://joinup.ec.europa.eu/sites/default/files/inline-files/EUPL%20v1_2%20EN(1).txt
Unless required by applicable law or agreed to in writing, software distributed under the Licence is distributed on an "AS IS" basis,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the Licence for the specific language governing permissions and limitations under the Licence.
"""

import argparse
import os
from pathlib import Path
import pandas as pd
import sys
import time
import xarray as xr
from typing import Dict, List, Union, Optional
# from tqdm.auto import tqdm


def read_inputmaps(inputmaps: Union[str, Path]) -> xr.Dataset:
"""It reads the input maps in NetCDF format from the input directory
Parameters:
-----------
inputmaps: str or pathlib.Path
directory that contains the input NetCDF files whose statistics will be computed. These files can be static (withouth time dimension) or dynamic (with time dimension)
Returns:
--------
ds: xr.Dataset
"""

inputmaps = Path(inputmaps)
if not inputmaps.is_dir():
print(f'ERROR: {inputmaps} is missing or not a directory!')
sys.exit(1)

filepaths = list(inputmaps.glob('*.nc'))
if not filepaths:
print(f'ERROR: No NetCDF files found in "{inputmaps}"')
sys.exit(2)

print(f'{len(filepaths)} input NetCDF files found in "{inputmaps}"')

try:
# for dynamic maps
ds = xr.open_mfdataset(filepaths, chunks='auto', parallel=True, engine='netcdf4')
# chunks is set to auto for general purpose processing
# it could be optimized depending on input NetCDF
except:
# for static maps
ds = xr.Dataset({file.stem.split('_')[0]: xr.open_dataset(file, engine='netcdf4')['Band1'] for file in filepaths})
if 'wgs_1984' in ds:
ds = ds.drop_vars('wgs_1984')

return ds

def read_masks(mask: Union[str, Path]) -> Dict[int, xr.DataArray]:
"""It loads the catchment masks in NetCDF formal from the input directory
Parameters:
-----------
mask: str or pathlib.Path
directory that contains the NetCDF files that define the catchment boundaries. These files can be the output of the `cutmaps` tool
Returns:
--------
masks: dictionary of xr.DataArray
keys represent the catchment ID and the values boolean maps of the catchment
"""

# check masks
mask = Path(mask)
if not mask.is_dir():
print(f'ERROR: {mask} is not a directory!')
sys.exit(1)

maskpaths = list(mask.glob('*.nc'))
if not maskpaths:
print(f'ERROR: No NetCDF files found in "{mask}"')
sys.exit(2)

print(f'{len(maskpaths)} mask NetCDF files found in "{mask}"')

# load masks
masks = {}
for maskpath in maskpaths:
ID = int(maskpath.stem)
try:
try:
aoi = xr.open_dataset(maskpath, engine='netcdf4')['Band1']
except:
aoi = xr.open_dataarray(maskpath, engine='netcdf4')
aoi = xr.where(aoi.notnull(), 1, aoi)
masks[ID] = aoi
except Exception as e:
print(f'ERROR: The mask {maskpath} could not be read: {e}')
continue

return masks

def read_pixarea(pixarea: Union[str, Path]) -> xr.DataArray:
"""It reads the LISFLOOD pixel area static map
Parameters:
-----------
pixarea: string or Path
a NetCDF file with pixel area used to compute weighted statistics. It is specifically meant for geographic projection systems where the area of a pixel varies with latitude
Returns:
--------
weight: xr.DataArray
"""

pixarea = Path(pixarea)
if not pixarea.is_file():
print(f'ERROR: {pixarea} is not a file!')
sys.exit(1)

try:
weight = xr.open_dataset(pixarea, engine='netcdf4')['Band1']
except Exception as e:
print(f'ERROR: The weighing map "{pixarea}" could not be loaded: {e}')
sys.exit(2)

return weight

def catchment_statistics(maps: Union[xr.DataArray, xr.Dataset],
masks: Dict[int, xr.DataArray],
statistic: Union[str, List[str]],
weight: Optional[xr.DataArray] = None,
output: Optional[Union[str, Path]] = None,
overwrite: bool = False
) -> Optional[xr.Dataset]:
"""
Given a set of input maps and catchment masks, it computes catchment statistics.
Parameters:
-----------
maps: xarray.DataArray or xarray.Dataset
map or set of maps from which catchment statistics will be computed
masks: dictionary of xr.DataArray
a set of catchment masks. The tool `cutmaps` in this repository can be used to generate them
statistic: string or list of strings
statistics to be computed. Only some statistics are available: 'mean', 'sum', 'std', 'var', 'min', 'max', 'median', 'count'
weight: optional or xr.DataArray
map used to weight each pixel in "maps" before computing the statistics. It is meant to take into account the different pixel area in geographic projections
output: optional, str or pathlib.Path
directory where the resulting NetCDF files will be saved. If not provided, the results are put out as a xr.Dataset
overwrite: boolean
whether to overwrite or skip catchments whose output NetCDF file already exists. By default is False, so the catchment will be skipped
Returns:
--------
A xr.Dataset of all catchment statistics or a NetCDF file for each catchment in the "masks" dictionary
"""

start_time = time.perf_counter()

if isinstance(maps, xr.DataArray):
maps = xr.Dataset({maps.name: maps})

# check statistic
if isinstance(statistic, str):
statistic = [statistic]
possible_stats = ['mean', 'sum', 'std', 'var', 'min', 'max', 'median', 'count']
assert all(stat in possible_stats for stat in statistic), "All values in 'statistic' should be one of these: {0}".format(', '.join(possible_stats))
stats_dict = {var: statistic for var in maps}

# output directory
if output is None:
results = []
else:
output = Path(output)
output.mkdir(parents=True, exist_ok=True)

# define coordinates and variables of the resulting Dataset
dims = dict(maps.dims)
dimnames = [dim.lower() for dim in dims]
if 'lat' in dimnames and 'lon' in dimnames:
x_dim, y_dim = 'lon', 'lat'
else:
x_dim, y_dim = 'x', 'y'
del dims[x_dim]
del dims[y_dim]
coords = {dim: maps[dim] for dim in dims}
variables = [f'{var}_{stat}' for var, stats in stats_dict.items() for stat in stats]

# compute statistics for each catchemnt
# for ID in tqdm(masks.keys(), desc='processing catchments'):
for ID in masks.keys():

if output is not None:
fileout = output / f'{ID:04}.nc'
if fileout.exists() and ~overwrite:
print(f'Output file {fileout} already exists. Moving forward to the next catchment')
continue

# create empty Dataset
coords.update({'id': [ID]})
maps_aoi = xr.Dataset({var: xr.DataArray(coords=coords, dims=coords.keys()) for var in variables})

# apply mask to the dataset
aoi = masks[ID]
masked_maps = maps.sel({x_dim: aoi[x_dim], y_dim: aoi[y_dim]}).where(aoi == 1)
masked_maps = masked_maps.compute()

# apply weighting
if weight is not None:
masked_weight = weight.sel({x_dim: aoi[x_dim], y_dim: aoi[y_dim]}).where(aoi == 1)
weighted_maps = masked_maps.weighted(masked_weight.fillna(0))

# compute statistics
for var, stats in stats_dict.items():
for stat in stats:
if (stat in ['mean', 'sum', 'std', 'var']) and (weight is not None):
x = getattr(weighted_maps, stat)(dim=[x_dim, y_dim])[var]
else:
x = getattr(masked_maps, stat)(dim=[x_dim, y_dim])[var]
maps_aoi[f'{var}_{stat}'].loc[{'id': ID}] = x

# save results
if output is None:
results.append(maps_aoi)
else:
maps_aoi.to_netcdf(fileout)

end_time = time.perf_counter()
elapsed_time = end_time - start_time
print(f"Time elapsed: {elapsed_time:0.2f} seconds")

if output is None:
results = xr.concat(results, dim='id')
return results

def main(argv=sys.argv):
prog = os.path.basename(argv[0])
parser = argparse.ArgumentParser(
description="""
Utility to compute catchment statistics from (multiple) NetCDF files.
The mask masp are NetCDF files with values in the area of interest and NaN elsewhere.
The area map is optional and accounts for varying pixel area with latitude.
""",
prog=prog,
)
parser.add_argument("-i", "--input", required=True, help="Directory containing the input NetCDF files")
parser.add_argument("-m", "--mask", required=True, help="Directory containing the mask NetCDF files")
parser.add_argument("-s", "--statistic", nargs='+', required=True, help='List of statistics to be computed. Possible values: mean, sum, std, var, min, max, median, count')
parser.add_argument("-o", "--output", required=True, help="Directory where the output NetCDF files will be saved")
parser.add_argument("-a", "--area", required=False, default=None, help="NetCDF file of pixel area used to weigh the statistics")
parser.add_argument("-W", "--overwrite", action="store_true", help="Overwrite existing output files")

args = parser.parse_args()

try:
maps = read_inputmaps(args.input)
masks = read_masks(args.mask)
weight = read_pixarea(args.area) if args.area is not None else None
catchment_statistics(maps, masks, args.statistic, weight=weight, output=args.output, overwrite=args.overwrite)
except Exception as e:
print(f'ERROR: {e}')
sys.exit(1)

def main_script():
sys.exit(main())

if __name__ == "__main__":
main_script()
8 changes: 4 additions & 4 deletions src/lisfloodutilities/cutmaps/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ def add_args(self):

group_filelist.add_argument("-f", "--folder", help='Directory with netCDF files to be cut')
group_filelist.add_argument("-F", "--file", help='netCDF file to be cut')
group_filelist.add_argument("-S", "--static-data",
help='Directory with EFAS/GloFAS static maps. '
'Output files will have same directories structure')
group_filelist.add_argument("-S", "--subdir",
help='Directory containing folders '
'Output files will have same directory-folders structure')

self.add_argument("-o", "--outpath", help='path where to save cut files',
default='./cutmaps_out', required=True)
Expand All @@ -93,7 +93,7 @@ def main(cliargs):

input_folder = args.folder
input_file = args.file
static_data_folder = args.static_data
static_data_folder = args.subdir
overwrite = args.overwrite
pathout = args.outpath
if not os.path.exists(pathout):
Expand Down
Loading

0 comments on commit 506b693

Please sign in to comment.