Skip to content

Commit

Permalink
Create KIWIS interpreter and filter.
Browse files Browse the repository at this point in the history
  • Loading branch information
gnrgomes committed Jan 3, 2024
1 parent 394859c commit c7d576a
Show file tree
Hide file tree
Showing 5 changed files with 329 additions and 151 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[GENERIC]

INPUT_WILDCARD=*.tiff
INPUT_WILDCARD = ??????????00_all.kiwis
INPUT_TIMESTAMP_PATTERN = %%Y%%m%%d%%H%%M

NETCDF_REFERENCE=A European daily high-resolution gridded meteorological data set for 1990 - 2022
NETCDF_TITLE = Lisflood meteo maps 1990-2023 for European setting Feb. 2023
Expand All @@ -23,6 +24,7 @@ VALUE_OFFSET = 0.0
DATA_TYPE_PACKED = i2
STANDARD_NAME = DUMMY_STANDARD_NAME
LONG_NAME = DUMMY LONG NAME
KIWIS_FILTER_COLUMNS = {'COL_LAT': 'station_local_y', 'COL_LON': 'station_local_x', 'COL_IS_IN_DOMAIN': 'EFAS_ADDATTR_ISINNEWDOMAIN'}
KIWIS_FILTER_PLUGIN_CLASSES = {'KiwisFilter': {}}

[DIMENSION]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[GENERIC]

INPUT_WILDCARD=*.tiff
INPUT_WILDCARD = ??????????00_all.kiwis
INPUT_TIMESTAMP_PATTERN = %%Y%%m%%d%%H%%M

NETCDF_REFERENCE=A European daily high-resolution gridded meteorological data set for 1990 - 2022
NETCDF_TITLE = Lisflood meteo maps 1990-2023 for European setting Feb. 2023
Expand All @@ -24,6 +25,7 @@ VALUE_OFFSET = 0.0
DATA_TYPE_PACKED = i2
STANDARD_NAME = DUMMY_STANDARD_NAME
LONG_NAME = DUMMY LONG NAME
KIWIS_FILTER_COLUMNS = {'COL_LAT': 'station_local_y', 'COL_LON': 'station_local_x', 'COL_IS_IN_DOMAIN': 'EFAS_ADDATTR_ISINNEWDOMAIN'}
KIWIS_FILTER_PLUGIN_CLASSES = {'KiwisFilter': {}}

[DIMENSION]
Expand Down
66 changes: 40 additions & 26 deletions src/lisfloodutilities/gridding/generate_grids.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pathlib import Path
from argparse import ArgumentParser, ArgumentTypeError
from datetime import datetime, timedelta
from lisfloodutilities.gridding.lib.utils import Printable, Dem, Config, FileUtils, GriddingUtils # , KiwisLoader
from lisfloodutilities.gridding.lib.utils import Printable, Dem, Config, FileUtils, GriddingUtils, KiwisLoader
from lisfloodutilities.gridding.lib.writers import NetCDFWriter, GDALWriter


Expand All @@ -46,7 +46,7 @@ def memory_save_mode_type(mode: str) -> str:
return mode

def run(config_filename: str, infolder: str, output_file: str, processing_dates_file: str, file_utils: FileUtils,
output_tiff: bool, overwrite_output: bool, start_date: datetime = None, end_date: datetime = None,
output_tiff: bool, output_netcdf: bool, overwrite_output: bool, start_date: datetime = None, end_date: datetime = None,
interpolation_mode: str = 'adw', use_broadcasting: bool = False, memory_save_mode: str = None):
"""
Interpolate text files containing (x, y, value) using inverse distance interpolation.
Expand Down Expand Up @@ -83,24 +83,24 @@ def run(config_filename: str, infolder: str, output_file: str, processing_dates_
outfile = output_file
if output_tiff:
output_writer_tiff = GDALWriter(conf, overwrite_output, quiet_mode)
output_writer_netcdf = NetCDFWriter(conf, overwrite_output, quiet_mode)
output_writer_netcdf.open(Path(outfile))
# file_loader = KiwisLoader(conf, overwrite_output, Path(infolder), quiet_mode)
# for filename in file_loader:
for filename in sorted(Path(infolder).rglob(inwildcard)):
if output_netcdf:
output_writer_netcdf = NetCDFWriter(conf, overwrite_output, quiet_mode)
output_writer_netcdf.open(Path(outfile))
file_loader = KiwisLoader(conf, Path(infolder), overwrite_output, quiet_mode)
for filename in file_loader:
file_timestamp = file_utils.get_timestamp_from_filename(filename) + timedelta(days=netcdf_offset_file_date)
if not file_utils.processable_file(file_timestamp, dates_to_process, conf.start_date, conf.end_date):
continue # Skip processing file
print_msg(f'Processing file: {filename}')
if output_tiff:
outfilepath = filename.with_suffix('.tiff')
output_writer_tiff.open(outfilepath)
grid_data = grid_utils.generate_grid(filename)
output_writer_netcdf.write(grid_data, file_timestamp)
if output_netcdf:
output_writer_netcdf.write(grid_data, file_timestamp)
if output_tiff:
output_writer_tiff.write(grid_data, file_timestamp)
output_writer_tiff.close()
output_writer_netcdf.close()
if output_netcdf:
output_writer_netcdf.close()
print_msg('Finished writing files')


Expand Down Expand Up @@ -136,19 +136,20 @@ def main(argv):
# set defaults
parser.set_defaults(quiet=False,
out_tiff=False,
out_netcdf=False,
overwrite_output=False,
start_date='',
end_date=END_DATE_DEFAULT,
interpolation_mode='adw',
use_broadcasting=False,
memory_save_mode='0')

parser.add_argument("-i", "--in", dest="infolder", required=True, type=FileUtils.folder_type,
help="Set input folder path with kiwis/point files",
metavar="input_folder")
parser.add_argument("-o", "--out", dest="output_file", required=True, type=FileUtils.file_or_folder,
parser.add_argument("-i", "--in", dest="in_file_or_folder", required=True, type=FileUtils.file_or_folder,
help="Set a single input kiwis file or folder path containing all the kiwis files.",
metavar="/path/to/pr200102150600_all.kiwis")
parser.add_argument("-o", "--out", dest="output_file", required=False, type=FileUtils.file_type,
help="Set the output netCDF file path containing all the timesteps between start and end dates.",
metavar="output_netcdf_file")
metavar="/path/to/pr2001.nc")
parser.add_argument("-c", "--conf", dest="config_type", required=True,
help="Set the grid configuration type to use.",
metavar="{5x5km, 1arcmin,...}")
Expand All @@ -169,7 +170,9 @@ def main(argv):
metavar="YYYYMMDDHHMISS")
parser.add_argument("-q", "--quiet", dest="quiet", action="store_true", help="Set script output into quiet mode [default: %(default)s]")
parser.add_argument("-t", "--tiff", dest="out_tiff", action="store_true",
help="Outputs a tiff file per timestep and also the single netCDF with all the timesteps [default: %(default)s]")
help="Outputs a tiff file per timestep [default: %(default)s]")
parser.add_argument("-n", "--netcdf", dest="out_netcdf", action="store_true",
help="Outputs a single netCDF with all the timesteps [default: %(default)s]")
parser.add_argument("-f", "--force", dest="overwrite_output", action="store_true",
help="Force write to existing file. TIFF files will be overwritten and netCDF file will be appended. [default: %(default)s]")
parser.add_argument("-m", "--mode", dest="interpolation_mode", required=False, type=interpolation_mode_type,
Expand All @@ -195,6 +198,22 @@ def main(argv):

config_filename = file_utils.get_config_file(config_type_path)

if not args.out_tiff and not args.out_netcdf:
parser.error(f'You must choose at least one output format, TIFF (--tiff) and/or netCDF (--netcdf)')
if args.out_netcdf:
if args.output_file is None:
parser.error("--netcdf requires defining the output file with --out.")
else:
print_msg("Output Type: netCDF")
print_msg(f"Output File: {args.output_file}")
# TIFF output is written to the folder where each of the kiwis files exist
if args.out_tiff:
print_msg("Output Type: TIFF")
output_path = Path(args.in_file_or_folder)
if output_path.is_file():
output_path = output_path.parent
print_msg(f"Output Folder: {output_path}")

start_date = None
try:
start_date = datetime.strptime(args.start_date, FileUtils.DATE_PATTERN_CONDENSED)
Expand All @@ -212,22 +231,17 @@ def main(argv):
end_date_str = end_date.strftime(FileUtils.DATE_PATTERN_SEPARATED)
print_msg(f"End Date: {end_date_str}")

print_msg(f"Input Folder: {args.infolder}")
print_msg(f"Input Folder: {args.in_file_or_folder}")
print_msg(f"Overwrite Output: {args.overwrite_output}")
print_msg(f"Interpolation Mode: {args.interpolation_mode}")
print_msg(f"RAM Save Mode: {args.memory_save_mode}")
print_msg(f"Broadcasting: {args.use_broadcasting}")
if args.out_tiff:
print_msg("Output Type: TIFF")
print_msg(f"Output Folder: {args.infolder}")
print_msg("Output Type: netCDF")
print_msg(f"Output File: {args.output_file}")
print_msg(f"Processing Dates File: {args.processing_dates_file}")
print_msg(f"Config File: {config_filename}")

run(config_filename, args.infolder, args.output_file, args.processing_dates_file,
file_utils, args.out_tiff, args.overwrite_output, start_date, end_date, args.interpolation_mode,
args.use_broadcasting, args.memory_save_mode)
run(config_filename, args.in_file_or_folder, args.output_file, args.processing_dates_file, file_utils, args.out_tiff,
args.out_netcdf, args.overwrite_output, start_date, end_date, args.interpolation_mode, args.use_broadcasting,
args.memory_save_mode)
except Exception as e:
indent = len(program_name) * " "
sys.stderr.write(program_name + ": " + repr(e) + "\n")
Expand Down
106 changes: 97 additions & 9 deletions src/lisfloodutilities/gridding/lib/filters.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,126 @@
from pathlib import Path
import pandas as pd
import re
from datetime import datetime as dt

class KiwisFilter():

def __init__(self, filter_args: dict):
def __init__(self, filter_columns: dict, filter_args: dict):
self.args = filter_args
self.filter_columns = filter_columns
self.stati = {"Active": 1, "Inactive": 0, "yes": 0, "no": 1, "Closed": 0, "Under construction": 0}
self.defaultReturn = 1
self.cur_timestamp = ''
print('ARGS: ', self.args)
self.COL_LAT = self.__get_column_name('COL_LAT', 'station_latitude')
self.COL_LON = self.__get_column_name('COL_LON', 'station_longitude')
self.COL_VALUE = self.__get_column_name('COL_VALUE', 'ts_value')
self.COL_QUALITY_CODE = self.__get_column_name('COL_QUALITY_CODE', 'q_code')
self.COL_STATION_DIARY_STATUS = self.__get_column_name('COL_STATION_DIARY_STATUS', 'station_diary_status')
self.COL_NO_GRIDDING = self.__get_column_name('COL_NO_GRIDDING', 'EFAS-ADDATTR-NOGRIDDING')
self.COL_IS_IN_DOMAIN = self.__get_column_name('COL_IS_IN_DOMAIN', 'EFAS-ADDATTR-ISINARCMINDOMAIN')
self.COL_EXCLUDE = self.__get_column_name('COL_EXCLUDE', 'EXCLUDE')
self.COL_INACTIVE_HISTORIC = self.__get_column_name('COL_INACTIVE_HISTORIC', 'INACTIVE_histattr')

self.OUTPUT_COLUMNS = [self.COL_LON, self.COL_LAT, self.COL_VALUE]


def filter(self, kiwis_files: array) -> array:
def filter(self, kiwis_files: list, kiwis_timestamps: list, kiwis_data_frames: list) -> list:
"""
Filter all kiwis files in the list and returns a list of the corresponding filtered pandas data frames.
If the kiwis_data_frames is not empty then filter the kiwis dataframes instead of the kiwis_files
"""
filtered_data_frames = []
i = 0
for file_path in kiwis_files:
df_kiwis = pd.read_csv(file_path, sep="\t")
if len(kiwis_data_frames) > 0:
df_kiwis = kiwis_data_frames[i]
else:
df_kiwis = pd.read_csv(file_path, sep="\t")
self.cur_timestamp = dt.strptime(f'{kiwis_timestamps[i]}00', "%Y%m%d%H%M%S")
df_kiwis = self.__apply_filter(df_kiwis)
filtered_data_frames.append(df_kiwis)
i += 1
return filtered_data_frames

def __get_column_name(self, column_arg_key: str, column_default_name: str):
return column_default_name if column_arg_key not in self.filter_columns else self.filter_columns[column_arg_key]

def __apply_filter(self, df: pd.DataFrame) -> pd.DataFrame:
# Get the code to filter kiwis leaving only the rows to be used for point file creation
return df
"""
Filter kiwis leaving only the rows to be used for point file creation
"""
# convert to string to make it easy to compare columns that have a mixture of string and number data
df = df.astype(str)
df = df.replace('nan', '')
# Translate status columns
df[f'{self.COL_STATION_DIARY_STATUS}_INTERNAL'] = df[self.COL_STATION_DIARY_STATUS].apply(self.rewritecol)
df[f'{self.COL_INACTIVE_HISTORIC}_INTERNAL'] = df[self.COL_INACTIVE_HISTORIC].apply(self.rewritecol)
# Apply filtering rules
df = df.loc[((df[f'{self.COL_QUALITY_CODE}'] == '40') | (df[f'{self.COL_QUALITY_CODE}'] == '120')) &
(df[f'{self.COL_NO_GRIDDING}'] == 'no') & (df[f'{self.COL_IS_IN_DOMAIN}'] == 'yes') &
(df[f'{self.COL_EXCLUDE}'] != 'yes') & (df[f'{self.COL_STATION_DIARY_STATUS}_INTERNAL'] == 1) &
(df[f'{self.COL_INACTIVE_HISTORIC}_INTERNAL'] == 1)]
return df[self.OUTPUT_COLUMNS]

def getStatus(self, status):
try:
curstate = int(self.stati[status])
return curstate
except:
return 0

def getNotStatus(self, status):
try:
curstate = int(not self.stati[status])
return curstate
except:
return 0

def rewritecol(self, cell_value: str):
return_code = self.defaultReturn
to_eval_timestamp = self.cur_timestamp
status_strings = None

if cell_value:
status_strings = cell_value.split("<br>")
datetime_list = []
status_list = []
if status_strings:
for crnt_string in status_strings:
date_str, status = re.match("(\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d): (.*)", crnt_string).groups()
datetime_list.append(dt.strptime(date_str, "%Y-%m-%d %H:%M:%S"))
status_list.append(status)
if datetime_list:
if to_eval_timestamp < datetime_list[0]:
"""Timestamp is before the first timestamp, return code should be the NOT of status"""
return_code = self.getNotStatus(status_list[0])
elif to_eval_timestamp >= datetime_list[-1]:
"""Timestamp is after the last timestamp, using last timestamp as value"""
return_code = self.getStatus(status_list[-1])
else:
for i in range(0, len(datetime_list)):
if i != len(datetime_list) - 1:
if datetime_list[i] <= to_eval_timestamp < datetime_list[i+1]:
return_code = self.getStatus(status_list[i])
return return_code


# KIWIS_FILTER_PLUGIN_CLASSES = {'DowgradedObservationsKiwisFilter': {'1295': 1.0}, 'ObservationsKiwisFilter': {'1303': 100.0}}

class DowgradedObservationsKiwisFilter(KiwisFilter):

def __init__(self, filter_args: dict):
super().__init__(filter_args)
def __init__(self, filter_columns: dict, filter_args: dict):
super().__init__(filter_columns, filter_args)

def __apply_filter(self, df: pd.DataFrame) -> pd.DataFrame:
df = super().__apply_filter(df)
return df

class ObservationsKiwisFilter(KiwisFilter):

def __init__(self, filter_args: dict):
super().__init__(filter_args)
def __init__(self, filter_columns: dict, filter_args: dict):
super().__init__(filter_columns, filter_args)

def __apply_filter(self, df: pd.DataFrame) -> pd.DataFrame:
df = super().__apply_filter(df)
Expand Down
Loading

0 comments on commit c7d576a

Please sign in to comment.