Skip to content

Commit

Permalink
WIP: potential filtering problem with pyaro
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Griesfeller committed Sep 11, 2024
1 parent d140802 commit 0692e97
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 35 deletions.
105 changes: 86 additions & 19 deletions src/pyaro_readers/actrisebas/ActrisEbasReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
DEFINITION_FILE = os.path.join(
os.path.dirname(os.path.realpath(__file__)), DEFINITION_FILE_BASENAME
)
# name of the standard_name section in the DEFINITION_FILE
STD_NAME_SECTION_NAME = "actris_standard_names"

# number of times an api request is tried before we consider it failed
MAX_RETRIES = 2
Expand All @@ -54,42 +56,58 @@
TIME_VAR_NAME = ["time"]



class ActrisEbasRetryException(Exception):
pass


class ActrisEbasStdNameNotFoundException(Exception):
pass


class ActrisEbasQcVariableNotFoundException(Exception):
pass


class ActrisEbasTestDataNotFoundException(Exception):
pass


class ActrisEbasTimeSeriesReader(AutoFilterReaderEngine.AutoFilterReader):
def __init__(
self,
vars_to_read: list[str] = None,
filters=[],
tqdm_desc: str | None = None,
ts_type: str = "daily",
test_flag: bool = True,
):
""" """
self._filename = None
self.vars_to_read = vars_to_read
self._stations = {}
self._urls_to_dl = {}
self._data = {} # var -> {data-array}
self._set_filters(filters)
self._header = []
self._metadata = {}
self._standard_names = {}
_laststatstr = ""
self._revision = datetime.datetime.min
self._revision = datetime.datetime.now()
self._metadata["revision"] = datetime.datetime.strftime(
self._revision, "%y%m%d%H%M%S"
)

try:
self.vars_to_read = filters["variables"]["include"]
except KeyError:
raise ValueError(
f"As of now, you have to give the species you want to read in filter.variables.include"
)
if "variables" in filters:
if "include" in filters["variables"]:
self.vars_to_read = filters["variables"]["include"]
logger.info(f"applying variable include filter {vars_to_read}...")
# try:
# self.vars_to_read = filters["variables"]["include"]
# except KeyError:
# raise ValueError(
# f"As of now, you have to give the species you want to read in filter.variables.include"
# )

try:
self.sites_to_read = filters["stations"]["include"]
Expand All @@ -105,6 +123,7 @@ def __init__(
self._def_data = self._read_definitions(file=DEFINITION_FILE)
for var in self.vars_to_read:
self._metadata[var] = {}
self._standard_names[var] = self.get_actris_standard_name(var)
# for testing since the API is error-prone and slow at the time of this writing
test_file = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
Expand All @@ -128,13 +147,15 @@ def __init__(
sites_to_read=self.sites_to_read,
sites_to_exclude=self.sites_to_exclude,
)
self._data[var] = self.read_data(self._urls_to_dl[var])
self.read_data(var, self._urls_to_dl[var])
# assert self.data(self.vars_to_read[0])

def metadata(self):
return self._metadata

def read_data(
self,
actris_variable: str,
urls_to_dl: dict,
tqdm_desc="reading stations",
):
Expand Down Expand Up @@ -164,17 +185,25 @@ def read_data(
):
# the naming of the variable in the file does not reflect the vocabulary naming ot pyaerocom's
# naming
ret_data_var = _data_var.copy()
# ret_data_var = _data_var.copy()
# if ret_data_var not in self.vars_to_read and :
# # we need

# look for a standard_name match and return only that variable
std_name = self._standard_names[actris_variable]
if self.get_ebas_data_standard_name(tmp_data, _data_var) != self._standard_names[actris_variable]:
logger.info(f"file #{d_idx: }skipping variable {_data_var} due to wrong standard name")
print(f"file #{d_idx: } skipping variable {_data_var} due to wrong standard name")
continue

vals = tmp_data[_data_var].values
flags = np.full(ts_no, Flag.VALID)
if _data_var not in self._data:
self._data[_data_var] = NpStructuredData(
_data_var, self.get_ebas_data_units(tmp_data, _data_var)
if actris_variable not in self._data:
self._data[actris_variable] = NpStructuredData(
actris_variable, self.get_ebas_data_units(tmp_data, _data_var)
)

self._data[_data_var].append(
self._data[actris_variable].append(
value=vals,
station=station,
latitude=lat,
Expand All @@ -187,11 +216,11 @@ def read_data(
standard_deviation=standard_deviation,
)
# make sure to return something in the user given variable name for now
try:
if _data_var != self.vars_to_read[d_idx]:
self._data[self.vars_to_read[d_idx]] = self._data[_data_var]
except IndexError:
pass
# try:
# if _data_var != self.vars_to_read[d_idx]:
# self._data[self.vars_to_read[d_idx]] = self._data[_data_var]
# except IndexError:
# pass
if not site_name in self._stations:
self._stations[site_name] = Station(
{
Expand All @@ -201,7 +230,7 @@ def read_data(
"altitude": altitude[0],
"country": self.get_ebas_data_country_code(tmp_data),
"url": "",
"long_name": site_name,
"long_name": long_name,
}
)
bar.update(1)
Expand All @@ -212,10 +241,48 @@ def get_ebas_data_units(self, tmp_data, var_name):
unit = tmp_data[var_name].attrs["units"]
return unit

def get_ebas_data_standard_name(self, tmp_data, var_name):
"""small helper method to get the ebas standard_name for a given variable from the data file"""
ret_data = tmp_data[var_name].attrs["standard_name"]
return ret_data

def get_ebas_data_ancillary_variables(self, tmp_data, var_name):
"""
small helper method to get the ebas ancillary variables from the data file
These contain the data flags (hopefully always ending with "_qc" and additional metedata
(hopefully always ending with "_ebasmetadata" for each time step
"""
ret_data = tmp_data[var_name].attrs["ancillary_variables"]
return ret_data

def get_ebas_data_qc_variable(self, tmp_data, var_name):
"""
small helper method to get the ebas quality control variable name
for a given variable name in the ebas data file
uses self.get_ebas_data_ancillary_variables to get the variable names of the
ancillary variables
"""
ret_data = None
for var in self.get_ebas_data_ancillary_variables(tmp_data, var_name):
for time_name in TIME_VAR_NAME:
if time_name in tmp_data[var_name].dims:
ret_data = var_name
break
if ret_data is None:
raise ActrisEbasQcVariableNotFoundException(f"Error: no flag data for variable {var_name} found!")
return ret_data

def get_ebas_data_country_code(self, tmp_data):
"""small helper method to get the ebas country code from the data file"""
return tmp_data.attrs["ebas_station_code"][0:2]

def get_actris_standard_name(self, actris_var_name):
"""small helper method to get corresponding CF standard name for a given ACTRIS variable"""
try:
return self._def_data[STD_NAME_SECTION_NAME][actris_var_name]
except KeyError:
raise ActrisEbasStdNameNotFoundException(f"Error: no CF standard name for {actris_var_name} found!")

def _get_ebas_data_vars(self, tmp_data, actris_var: str = None, units: str = None):
"""
small helper method to isolate potential data variables
Expand Down
14 changes: 12 additions & 2 deletions src/pyaro_readers/actrisebas/definitions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,41 @@
#ebas_matrix = [<lust of ebas matrix names>]

[variables.conco3]
actris_variable = ["ozone mass concentration", "ozone amount fraction"]
actris_variable = ["ozone mass concentration"]
actris_matrix = ["gas phase"]
ebas_component = ["ozone"]
ebas_matrix = ["air"]
ebas_unit = "ug/m3"
standard_name = "mass_concentration_of_ozone_in_air"

[variables.vmro3]
actris_variable = ["ozone mass concentration", "ozone amount fraction"]
actris_matrix = ["gas phase"]
ebas_component = ["ozone"]
ebas_matrix = ["air"]
ebas_unit = "nmol/mol"
standard_name = "mole_fraction_of_ozone_in_air"

[variables.concso4]
actris_variable = ["aerosol particle sulphate mass concentration"]
actris_matrix = ["aerosol particle phase", "PM10", "PM2.5"]
ebas_component = ["sulphate_corrected", "sulphate_total"]
ebas_matrix = ["aerosol", "pm10", "pm25"]
ebas_unit = "ug/m3"
standard_name = "mass_concentration_of_sulfate_aerosol_in_air"

[actris_std_units]
# tell the reader which unit shall be returned in case the same property is available in several units
# e.g. ozone mass concentration is available in the files in [ug/m3] and [nmol/mol]
# make sure to use ACTRIS-EBAS unit notation since this is a simple string match
# For variables not noted here, we will return the first unit found inb the first data file
"ozone mass concentration" = "nmol/mol"
"ozone mass concentration" = "ug/m3"
"ozone amount fraction" = "nmol/mol"
"aerosol particle sulphate mass concentration" = "ug/m3"

[actris_standard_names]
# match between ACTRIS vocabulary and CF standard name
# needed to match the variables in multicolumn files since the netcdf variable name has no meaning
"ozone mass concentration" = "mass_concentration_of_ozone_in_air"
"ozone amount fraction" = "mole_fraction_of_ozone_in_air"
"aerosol particle sulphate mass concentration" = "mass_concentration_of_sulfate_aerosol_in_air"
33 changes: 19 additions & 14 deletions tests/test_ActrisEbasReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pyaro
import pyaro.timeseries
from pyaro.timeseries.Wrappers import VariableNameChangingReader

TEST_URL = "https://prod-actris-md.nilu.no/Version"
VOCABULARY_URL = "https://prod-actris-md.nilu.no/V"
Expand All @@ -11,6 +12,11 @@
class TestActrisEbasTimeSeriesReader(unittest.TestCase):
engine = "actrisebas"

station_filter = {
"stations": {"include": ["Birkenes II", "Jungfraujoch"]},
}
vars_to_read = ["ozone mass concentration"]

def test_api_online(self, url=TEST_URL):
try:
req = urllib.request.Request(TEST_URL, method="HEAD")
Expand Down Expand Up @@ -48,19 +54,16 @@ def test_init(self):
def test_api_reading_small_data_set(self):
# test access to the EBAS API
filters = {
"variables": {
"include": [
"ozone mass concentration",
]
},
"stations": {"include": ["Birkenes II", "Jungfraujoch"]},
}
engine = pyaro.list_timeseries_engines()[self.engine]
#
with engine.open(
filters=filters,
filters=filters, vars_to_read=["ozone mass concentration"],
) as ts:
self.assertGreaterEqual(len(ts.variables()), 1)
self.assertEqual(len(ts.stations()), 2)
self.assertIn("revision", ts.metadata())

def test_api_reading_pyaerocom_naming(self):
# test access to the EBAS API
Expand All @@ -76,18 +79,20 @@ def test_api_reading_pyaerocom_naming(self):
#
with engine.open(
filters=filters,
vars_to_read=["vmro3"],
) as ts:
self.assertGreaterEqual(len(ts.variables()), 1)

#
# def test_wrappers(self):
# engine = pyaro.list_timeseries_engines()["aeronetsunreader"]
# new_var_name = "od500aer"
# with VariableNameChangingReader(
# engine.open(self.file, filters=[]), {"AOD_500nm": new_var_name}
# ) as ts:
# self.assertEqual(ts.data(new_var_name).variable, new_var_name)
# pass
def test_wrappers(self):
engine = pyaro.list_timeseries_engines()[self.engine]
new_var_name = "vmro3"
with VariableNameChangingReader(
engine.open(vars_to_read=self.vars_to_read, filters=self.station_filter),
{self.vars_to_read[0]: new_var_name}
) as ts:
self.assertEqual(ts.data(new_var_name).variable, new_var_name)
pass
#


Expand Down

0 comments on commit 0692e97

Please sign in to comment.