From 95d86e20491b37663f14e881a7215c0b0819b985 Mon Sep 17 00:00:00 2001 From: Heiko Klein Date: Sun, 9 Jun 2024 16:33:22 +0200 Subject: [PATCH] add netcdf_rw handler --- .gitignore | 3 +- setup.cfg | 12 +- .../ascii2netcdf/Ascii2NetcdfTimeseries.py | 11 +- .../netcdf_rw/Netcdf_RWTimeseries.py | 335 ++++++++++++++++++ src/pyaro_readers/netcdf_rw/__init__.py | 4 + tests/test_Netcdf_RWTimeseries.py | 73 ++++ 6 files changed, 427 insertions(+), 11 deletions(-) create mode 100644 src/pyaro_readers/netcdf_rw/Netcdf_RWTimeseries.py create mode 100644 src/pyaro_readers/netcdf_rw/__init__.py create mode 100644 tests/test_Netcdf_RWTimeseries.py diff --git a/.gitignore b/.gitignore index d42c243..b150ecc 100644 --- a/.gitignore +++ b/.gitignore @@ -50,6 +50,7 @@ coverage.xml .hypothesis/ .pytest_cache/ cover/ +tmp_netcdf_rw/ # Translations *.mo @@ -159,4 +160,4 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ -.vscode/ \ No newline at end of file +.vscode/ diff --git a/setup.cfg b/setup.cfg index f4414e4..4217c98 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,15 +1,15 @@ [metadata] name = pyaro_readers -version = 0.0.8 +version = 0.0.9.dev0 author = MET Norway description = implementations of pyaerocom reading plugings using pyaro as interface long_description = file: README.md long_description_content_type = text/markdown classifiers = Programming Language :: Python :: 3 :: Only - Programming Language :: Python :: 3.9 Programming Language :: Python :: 3.10 Programming Language :: Python :: 3.11 + Programming Language :: Python :: 3.12 License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3) Operating System :: OS Independent Development Status :: 3 - Alpha @@ -19,9 +19,9 @@ classifiers = url = https://github.com/metno/pyaro-readers [options] -python_version = >=3.9 +python_version = >=3.10 install_requires = - pyaro >= 0.0.8 + pyaro >= 0.0.10 geocoder_reverse_natural_earth >= 0.0.2 netCDF4 requests @@ -48,9 +48,11 @@ pyaro.timeseries = aeronetsunreader = pyaro_readers.aeronetsunreader:AeronetSunTimeseriesEngine aeronetsdareader = pyaro_readers.aeronetsdareader:AeronetSdaTimeseriesEngine ascii2netcdf = pyaro_readers.ascii2netcdf:Ascii2NetcdfTimeseriesEngine - nilupmfebas = pyaro_readers.nilupmfebas:EbasPmfTimeseriesEngine + netcdf_rw = pyaro_readers.netcdf_rw:Netcdf_RWTimeseriesEngine harp = pyaro_readers.harpreader:AeronetHARPEngine nilupmfabsorption = pyaro_readers.nilupmfabsorptionreader:NILUPMFAbsorptionTimeseriesEngine + nilupmfebas = pyaro_readers.nilupmfebas:EbasPmfTimeseriesEngine + [tox:tox] diff --git a/src/pyaro_readers/ascii2netcdf/Ascii2NetcdfTimeseries.py b/src/pyaro_readers/ascii2netcdf/Ascii2NetcdfTimeseries.py index d9c5ba1..674166f 100644 --- a/src/pyaro_readers/ascii2netcdf/Ascii2NetcdfTimeseries.py +++ b/src/pyaro_readers/ascii2netcdf/Ascii2NetcdfTimeseries.py @@ -163,10 +163,11 @@ def _get_data_from_ncfile(self, varname, file, data): if not epdl in nc.variables: return vdata = np.ma.filled(nc[epdl][:], np.nan) - if nc[epdl].units != data.units: - logger.warning( - f"units-change for {varname} in {file}: {nc[epdl].units} != {data.units}" - ) + if "units" in nc[epdl].ncattrs(): + if nc[epdl].units != data.units: + logger.warning( + f"units-change for {varname} in {file}: {nc[epdl].units} != {data.units}" + ) # get all arrays into same size, time fastes moving, i.e. [station][time] dstruct = {} @@ -203,7 +204,7 @@ def _get_data_from_ncfile(self, varname, file, data): for key in dstruct.keys(): dstruct[key] = dstruct[key][idx] - dstruct["flags"] = dstruct["data"].astype("i4") + dstruct["flags"] = np.zeros(len(dstruct["data"]), "i4") dstruct["flags"][:] = Flag.VALID data.append( diff --git a/src/pyaro_readers/netcdf_rw/Netcdf_RWTimeseries.py b/src/pyaro_readers/netcdf_rw/Netcdf_RWTimeseries.py new file mode 100644 index 0000000..efa8dd0 --- /dev/null +++ b/src/pyaro_readers/netcdf_rw/Netcdf_RWTimeseries.py @@ -0,0 +1,335 @@ +import datetime +import glob +import inspect +import json +import logging +import os +import netCDF4 +import numpy as np +from pyaro.timeseries import ( + AutoFilterReaderEngine, + Data, + NpStructuredData, + Station, +) +import pyaro.timeseries.Filter + +logger = logging.getLogger(__name__) + + +class Netcdf_RWTimeseriesException(Exception): + pass + + +class Netcdf_RWTimeseriesReader(AutoFilterReaderEngine.AutoFilterReader): + """Initialize/open a new reader for netcdf-files + + :param filename: directory name for pyaro_netcdf_rw.YYYY.nc, e.g. + /lustre/storeB/users/heikok/Ebas_converted/ + :param mode: 'r' for read-only, 'w' for writable + :param filters: list of filters, defaults to [] + files are parsed a per year, so adding a pyaro.timeseries.Filter.TimeBoundsFilter + is an advantage + """ + + ncfile_prefix = "pyaro_netcdf_rw" + + def __init__( + self, + filename, + mode="r", + filters=[], + ): + self._set_filters(filters) + self._mode = mode + if os.path.isdir(filename): + self._directory = filename + else: + if mode != "r": + raise Netcdf_RWTimeseriesException( + f"no such file or directory: {filename}" + ) + else: + os.path.makedirs(filename) + + dataglob = os.path.join(self._directory, f"{self.ncfile_prefix}.????.nc") + self._years = set() + for file in glob.iglob(dataglob): + year = file[-7:-3] + if self._is_year_in_filters(year): + self._years.add(year) + + try: + self._variables = self._read_json("variables.json", []) + self._metadata = self._read_json("metadata.json", {}) + self._stations = self._read_stations() + except Exception as ex: + raise Netcdf_RWTimeseriesException(f"unable to read definition-file: {ex}") + return + + def _read_json(self, file, empty): + filepath = os.path.join(self._directory, file) + res = empty + if os.path.exists(filepath): + with open(filepath, "r") as fh: + res = json.load(fh) + return res + + def _write_json(self, obj, file): + filepath = os.path.join(self._directory, file) + with open(filepath, "w") as fh: + json.dump(obj, fh) + return + + def _read_stations(self) -> dict[str, Station]: + stat_dict = {} + for stat, stat_kwargs in self._read_json("stations.json", {}).items(): + stat_dict[stat] = Station(**stat_kwargs) + return stat_dict + + def _write_stations(self): + stat_obj = {} + for stat, station in self.stations().items(): + if pyaro.__version__ > "0.0.10": + stat_obj[stat] = station.init_kwargs() + else: + stat_obj[stat] = { + "fields": station._fields, + "metadata": station.metadata, + } + self._write_json(stat_obj, "stations.json") + + def _is_year_in_filters(self, year): + start_year = np.datetime64(f"{year}-01-01 00:00:00") + end_year = np.datetime64(f"{year}-12-31 23:59:59") + time_filter = pyaro.timeseries.Filter.TimeBoundsFilter() + for fil in self._get_filters(): + if isinstance(fil, pyaro.timeseries.Filter.TimeBoundsFilter): + time_filter = fil + if time_filter.has_envelope(): + start, end = time_filter.envelope() + if end_year < start: + return False + if end < start_year: + return False + return True + + def _get_data_from_ncfile( + self, varname, file, data: NpStructuredData + ) -> NpStructuredData: + with netCDF4.Dataset(file, "r") as nc: + pos = nc.variable_names.index(varname) + if pos < 0: + logger.info(f"{varname} not in file {file}") + return data + if f"start_times_{pos}" not in nc.variables: + logger.info(f"{varname} not in file {file}, pos {pos}") + return data + + start_times = netCDF4.num2date( + nc[f"start_times_{pos}"][:], nc[f"start_times_{pos}"].units + ) + end_times = netCDF4.num2date( + nc[f"end_times_{pos}"][:], nc[f"end_times_{pos}"].units + ) + data_name = f"values_{pos}" + data_is_new = len(data) == 0 and data.units == "" + if data_is_new: + data = NpStructuredData(varname, nc[data_name].units) + if data.units != "" and nc[data_name].units != data.units: + logger.warning( + f"units-change for {varname}/{data_name} in {file}: {nc[data_name].units} != {data.units}" + ) + + data.append( + nc[data_name][:].filled(np.nan), + nc[f"stations_{pos}"][:].astype("U64"), + nc[f"latitudes_{pos}"][:].filled(np.nan), + nc[f"longitudes_{pos}"][:].filled(np.nan), + nc[f"altitudes_{pos}"][:].filled(np.nan), + start_times, + end_times, + nc[f"flags_{pos}"][:].filled(-32767), + nc[f"standard_deviations_{pos}"][:].filled(np.nan), + ) + return data + + def _tmp_and_real_ncfilename(self, year): + tmpfile = os.path.join( + self._directory, f"{self.ncfile_prefix}.{year}.nc.{os.getpid()}" + ) + file = os.path.join(self._directory, f"{self.ncfile_prefix}.{year}.nc") + return (tmpfile, file) + + def _tmp_ncfile(self, year, readerstr): + tmpfile, file = self._tmp_and_real_ncfilename(year) + tmpnc = netCDF4.Dataset(tmpfile, "w", format="NETCDF4") + if os.path.exists(file): + with netCDF4.Dataset(file, "r") as nc: + vars = nc.variable_names + for var in self.variables(): + if not var in vars: + vars.append(var) + tmpnc.variable_names = vars + oldhistory = nc.history + if isinstance(oldhistory, str): + oldhistory = [oldhistory] + tmpnc.history = oldhistory + [ + f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S} updated with netcdf_rw from {readerstr}" + ] + + else: + tmpnc.variable_names = self.variables() + tmpnc.history = [ + f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S} creation with netcdf_rw from {readerstr}" + ] + return tmpnc + + def _update_ncfile(self, nc, year, data): + """write the data to a nc-file, creating if needed + + :param nc: writeable nc-file, with variable_names global attribute + :param year: year of data + :param data: filtered data for one year + """ + var_name = data.variable + units = data.units + pos = nc.variable_names.index(var_name) + if pos < 0: + raise Netcdf_RWTimeseriesException( + f"{var_name} not in {nc.variable_names} for {year}" + ) + dim_name = f"dim_{pos}" + nc.createDimension(dim_name, len(data)) + for x in data.keys(): + if "time" in x: + var = nc.createVariable( + f"{x}_{pos}", np.int64, (dim_name), compression="zlib" + ) + var.units = "seconds since 1970-01-01 00:00:00 +00:00" + var[:] = data[x].astype("datetime64[s]").astype("int64") + else: + compression = "zlib" + if x == "stations": + compression = False + var = nc.createVariable( + f"{x}_{pos}", data[x].dtype, (dim_name), compression=compression + ) + if x == "altitudes": + var.units = "m" + var.standard_name = "altitude" + var.positive = "up" + if x == "longitudes": + var.units = "degrees_east" + if x == "latitudes": + var.units = "degrees_north" + if x == "values": + var.units = units + var.long_name = var_name + var.coordinates = ( + f"longitudes_{pos} latitudes_{pos} altitudes_{pos}" + ) + var[:] = data[x] + + def add(self, reader: pyaro.timeseries.Reader): + """add content of another reader to this netcdf_rw database + + All content will be added, except for duplicates which will be removed. + + :param reader: another pyaro-Reader including filters + """ + if self._mode == "r": + raise Netcdf_RWTimeseriesException( + f"add() not allowed on readonly (mode='{self._mode}') data-dir" + ) + self._metadata = reader.metadata() | self._metadata + self._stations = reader.stations() | self._stations + org_variables = self._variables + self._variables = list(set(org_variables + reader.variables())) + + ncfiles = {} + for var in reader.variables(): + logger.info(f"adding variable {var}") + if var in org_variables: + data = self.data(var) + if var in reader.variables(): + rdata = reader.data(var) + if data.units != rdata.units: + raise Netcdf_RWTimeseriesException( + f"change of unit for variable {var} from {data.units} to {rdata.units}" + ) + data.append( + rdata.values, + rdata.stations, + rdata.latitudes, + rdata.longitudes, + rdata.altitudes, + rdata.start_times, + rdata.end_times, + rdata.flags, + rdata.standard_deviations, + ) + else: + data = reader.data(var) + data = pyaro.timeseries.Filter.DuplicateFilter().filter_data( + data, self.stations(), self.variables() + ) + min_year = ( + np.min(data.start_times).astype("datetime64[Y]").astype(int) + 1970 + ) + max_year = np.max(data.end_times).astype("datetime64[Y]").astype(int) + 1970 + for year in range(min_year, max_year + 1): + if not year in ncfiles: + ncfiles[year] = self._tmp_ncfile(year, str(reader)) + ydata = pyaro.timeseries.Filter.TimeBoundsFilter( + startend_include=[ + (f"{year}-01-01 00:00:00", f"{year}-12-31 23:59:59") + ] + ).filter_data(data, self.stations(), self.variables()) + self._update_ncfile(ncfiles[year], year, ydata) + + # make the new files available + for year in ncfiles: + ncfiles[year].close() + tmpfile, file = self._tmp_and_real_ncfilename(year) + os.rename(tmpfile, file) + self._write_stations() + self._write_json(self.metadata(), "metadata.json") + self._write_json(self._variables, "variables.json") + return + + def _unfiltered_data(self, varname) -> Data: + data = NpStructuredData(varname, "") + for year in self._years: + file = os.path.join(self._directory, f"{self.ncfile_prefix}.{year}.nc") + if not os.path.exists(file): + logger.info(f"no datafile for {year} like {file}, skipping...") + continue + data = self._get_data_from_ncfile(varname, file, data) + + return data + + def _unfiltered_stations(self) -> dict[str, Station]: + return self._stations + + def _unfiltered_variables(self) -> list[str]: + return self._variables + + def close(self): + pass + + +class Netcdf_RWTimeseriesEngine(AutoFilterReaderEngine.AutoFilterEngine): + """Ascii-files converted by MSC-W to netcdf-format, e.g. using niluAscii2netcdf or eea_airquip2emepdata.py""" + + def reader_class(self): + return Netcdf_RWTimeseriesReader + + def open(self, filename, *args, **kwargs) -> Netcdf_RWTimeseriesReader: + return self.reader_class()(filename, *args, **kwargs) + + def description(self) -> str: + return inspect.doc(self) + + def url(self): + return "https://github.com/metno/pyaro-readers" diff --git a/src/pyaro_readers/netcdf_rw/__init__.py b/src/pyaro_readers/netcdf_rw/__init__.py new file mode 100644 index 0000000..a32af5b --- /dev/null +++ b/src/pyaro_readers/netcdf_rw/__init__.py @@ -0,0 +1,4 @@ +from .Netcdf_RWTimeseries import ( + Netcdf_RWTimeseriesReader, + Netcdf_RWTimeseriesEngine, +) diff --git a/tests/test_Netcdf_RWTimeseries.py b/tests/test_Netcdf_RWTimeseries.py new file mode 100644 index 0000000..c632133 --- /dev/null +++ b/tests/test_Netcdf_RWTimeseries.py @@ -0,0 +1,73 @@ +import logging +import os +from shutil import rmtree +import sys +import unittest +import numpy as np + +import pyaro +import pyaro.timeseries + +EBAS_URL = file = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "testdata", "NILU" +) + + +class TestNetcdf_RWTimeSeries(unittest.TestCase): + engine = "ascii2netcdf" + rwengine = "netcdf_rw" + rwdir = "tmp_netcdf_rw" + + def setUp(self) -> None: + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + logging.getLogger().setLevel(logging.DEBUG) + + os.makedirs(self.rwdir, exist_ok=True) + return super().setUp() + + @classmethod + def tearDownClass(cls) -> None: + if os.path.exists(cls.rwdir): + rmtree(cls.rwdir) + pass + return super().tearDownClass() + + def test_0engine(self): + self.assertIn(self.rwengine, pyaro.list_timeseries_engines()) + + def test_1write(self): + with pyaro.open_timeseries( + self.rwengine, self.rwdir, mode="w", filters=[] + ) as ts_rw: + with pyaro.open_timeseries( + self.engine, EBAS_URL, resolution="daily", filters=[] + ) as ts: + self.assertGreater(len(ts.variables()), 70) + self.assertGreater(len(ts.stations()), 300) + ts_rw.add(ts) + self.assertEqual(len(ts.variables()), len(ts_rw.variables())) + self.assertEqual(len(ts.stations()), len(ts_rw.stations())) + + def test_2open(self): + with pyaro.open_timeseries( + self.rwengine, self.rwdir, mode="w", filters=[] + ) as ts_rw: + with pyaro.open_timeseries( + self.engine, EBAS_URL, resolution="daily", filters=[] + ) as ts: + self.assertEqual(len(ts.variables()), len(ts_rw.variables())) + self.assertEqual(len(ts.stations()), len(ts_rw.stations())) + + def test_3write(self): + # write same data again, should not increase + with pyaro.open_timeseries( + self.rwengine, self.rwdir, mode="w", filters=[] + ) as ts_rw: + with pyaro.open_timeseries( + self.engine, EBAS_URL, resolution="daily", filters=[] + ) as ts: + self.assertGreater(len(ts.variables()), 70) + self.assertGreater(len(ts.stations()), 300) + ts_rw.add(ts) + self.assertEqual(len(ts.variables()), len(ts_rw.variables())) + self.assertEqual(len(ts.stations()), len(ts_rw.stations()))