From 9110d4c62b8321ac7c20b42daa841a8ed3a10bc3 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 25 Jul 2024 19:08:36 +0200 Subject: [PATCH] First small steps towards eea reader --- .../eeareader/EEATimeseriesReader.py | 267 ++++++++++++++++++ src/pyaro_readers/eeareader/eeadownloader.py | 65 +++++ 2 files changed, 332 insertions(+) create mode 100644 src/pyaro_readers/eeareader/EEATimeseriesReader.py create mode 100644 src/pyaro_readers/eeareader/eeadownloader.py diff --git a/src/pyaro_readers/eeareader/EEATimeseriesReader.py b/src/pyaro_readers/eeareader/EEATimeseriesReader.py new file mode 100644 index 0000000..98b607a --- /dev/null +++ b/src/pyaro_readers/eeareader/EEATimeseriesReader.py @@ -0,0 +1,267 @@ +import csv +from io import BytesIO +from urllib.parse import urlparse +from urllib.request import urlopen +from zipfile import BadZipFile, ZipFile + +from geocoder_reverse_natural_earth import ( + Geocoder_Reverse_NE, + Geocoder_Reverse_Exception, +) +import numpy as np +import requests +from pyaro.timeseries import ( + AutoFilterReaderEngine, + Data, + Flag, + NpStructuredData, + Station, +) +from tqdm import tqdm + +# default URL +BASE_URL = "https://aeronet.gsfc.nasa.gov/data_push/V3/All_Sites_Times_Daily_Averages_AOD20.zip" +# number of lines to read before the reading is handed to Pythobn's csv reader +HEADER_LINE_NO = 7 +DELIMITER = "," +# +NAN_VAL = -999.0 +# update progress bar every N lines... +PG_UPDATE_LINES = 100 +# main variables to store +LAT_NAME = "Site_Latitude(Degrees)" +LON_NAME = "Site_Longitude(Degrees)" +ALT_NAME = "Site_Elevation(m)" +SITE_NAME = "AERONET_Site_Name" +DATE_NAME = "Date(dd:mm:yyyy)" +TIME_NAME: str = "Time(hh:mm:ss)" +AOD500_NAME = "AOD_500nm" +ANG4487_NAME = "440-870_Angstrom_Exponent" +AOD440_NAME = "AOD_440nm" +AOD870_NAME = "AOD_870nm" +AOD550_NAME = "AOD_550nm" + +DATA_VARS = [AOD500_NAME, ANG4487_NAME, AOD440_NAME, AOD870_NAME] +COMPUTED_VARS = [AOD550_NAME] +# The computed variables have to be named after the read ones, otherwise the calculation will fail! +DATA_VARS.extend(COMPUTED_VARS) + +FILL_COUNTRY_FLAG = False + +TS_TYPE_DIFFS = { + "daily": np.timedelta64(12, "h"), + "instantaneous": np.timedelta64(0, "s"), + "points": np.timedelta64(0, "s"), + "monthly": np.timedelta64(15, "D"), +} + + +class EEATimeseriesReader(AutoFilterReaderEngine.AutoFilterReader): + def __init__( + self, + filename, + filters=[], + fill_country_flag: bool = FILL_COUNTRY_FLAG, + tqdm_desc: [str, None] = None, + ts_type: str = "daily", + ): + """open a new Aeronet timeseries-reader + + :param filename: str + :param filters: + :param fill_country_flag: + :param tqdm_desc: + :param filename_or_obj_or_url: path-like object to csv-file + + input file looks like this (daily file; times noted are middle times): + AERONET Version 3; + Cuiaba + Version 3: AOD Level 2.0 + The following data are automatically cloud cleared and quality assured with pre-field and post-field calibration applied. + Contact: PI=Pawan Gupta and Elena Lind; PI Email=Pawan.Gupta@nasa.gov and Elena.Lind@nasa.gov + Daily Averages,UNITS can be found at,,, https://aeronet.gsfc.nasa.gov/new_web/units.html + AERONET_Site,Date(dd:mm:yyyy),Time(hh:mm:ss),Day_of_Year,AOD_1640nm,AOD_1020nm,AOD_870nm,AOD_865nm,AOD_779nm,AOD_675nm,AOD_667nm,AOD_620nm,AOD_560nm,AOD_555nm,AOD_551nm,AOD_532nm,AOD_531nm,AOD_510nm,AOD_500nm,AOD_490nm,AOD_443nm,AOD_440nm,AOD_412nm,AOD_400nm,AOD_380nm,AOD_340nm,Precipitable_Water(cm),AOD_681nm,AOD_709nm,AOD_Empty,AOD_Empty,AOD_Empty,AOD_Empty,AOD_Empty,440-870_Angstrom_Exponent,380-500_Angstrom_Exponent,440-675_Angstrom_Exponent,500-870_Angstrom_Exponent,340-440_Angstrom_Exponent,440-675_Angstrom_Exponent[Polar],N[AOD_1640nm],N[AOD_1020nm],N[AOD_870nm],N[AOD_865nm],N[AOD_779nm],N[AOD_675nm],N[AOD_667nm],N[AOD_620nm],N[AOD_560nm],N[AOD_555nm],N[AOD_551nm],N[AOD_532nm],N[AOD_531nm],N[AOD_510nm],N[AOD_500nm],N[AOD_490nm],N[AOD_443nm],N[AOD_440nm],N[AOD_412nm],N[AOD_400nm],N[AOD_380nm],N[AOD_340nm],N[Precipitable_Water(cm)],N[AOD_681nm],N[AOD_709nm],N[AOD_Empty],N[AOD_Empty],N[AOD_Empty],N[AOD_Empty],N[AOD_Empty],N[440-870_Angstrom_Exponent],N[380-500_Angstrom_Exponent],N[440-675_Angstrom_Exponent],N[500-870_Angstrom_Exponent],N[340-440_Angstrom_Exponent],N[440-675_Angstrom_Exponent[Polar]],Data_Quality_Level,AERONET_Instrument_Number,AERONET_Site_Name,Site_Latitude(Degrees),Site_Longitude(Degrees),Site_Elevation(m) + Cuiaba,16:06:1993,12:00:00,167,-999.,0.081800,0.088421,-999.,-999.,0.095266,-999.,-999.,-999.,-999.,-999.,-999.,-999.,-999.,-999.,-999.,-999.,0.117581,-999.,-999.,-999.,0.149887,2.487799,-999.,-999.,-999.,-999.,-999.,-999.,-999.,0.424234,-999.,0.497630,-999.,0.924333,-999.,0,3,3,0,0,3,0,0,0,0,0,0,0,0,0,0,0,3,0,0,0,3,6,0,0,0,0,0,0,0,3,0,3,0,3,0,lev20,3,Cuiaba,-15.555244,-56.070214,234.000000 + Cuiaba,17:06:1993,12:00:00,168,-999.,0.092246,0.099877,-999.,-999.,0.110915,-999.,-999.,-999.,-999.,-999.,-999.,-999.,-999.,-999.,-999.,-999.,0.144628,-999.,-999.,-999.,0.187276,2.592902,-999.,-999.,-999.,-999.,-999.,-999.,-999.,0.547807,-999.,0.628609,-999.,0.988320,-999.,0,16,16,0,0,16,0,0,0,0,0,0,0,0,0,0,0,16,0,0,0,16,32,0,0,0,0,0,0,0,16,0,16,0,16,0,lev20,3,Cuiaba,-15.555244,-56.070214,234.000000 + """ + self._filename = filename + self._stations = {} + self._data = {} # var -> {data-array} + self._set_filters(filters) + self._header = [] + _laststatstr = "" + + # check if file is a URL + if self.is_valid_url(self._filename): + # try to open as zipfile + try: + r = requests.get(self._filename) + zip_ref = ZipFile(BytesIO(r.content)) + for file in zip_ref.namelist(): + with zip_ref.open(file) as response: + lines = [line.decode("utf-8") for line in response.readlines()] + # read only 1st file here + break + except BadZipFile: + response = urlopen(self._filename) + lines = [line.decode("utf-8") for line in response.readlines()] + + else: + with open(self._filename, newline="") as csvfile: + lines = csvfile.readlines() + + for _hidx in range(HEADER_LINE_NO - 1): + self._header.append(lines.pop(0)) + # get fields from header line although csv can do that as well since we might want to adjust these names + self._fields = lines.pop(0).strip().split(",") + + gcd = Geocoder_Reverse_NE() + crd = csv.DictReader(lines, fieldnames=self._fields, delimiter=DELIMITER) + bar = tqdm(desc=tqdm_desc, total=len(lines)) + for _ridx, row in enumerate(crd): + bar.update(1) + if row[SITE_NAME] != _laststatstr: + _laststatstr = row[SITE_NAME] + # new station + station = row[SITE_NAME] + lon = float(row[LON_NAME]) + lat = float(row[LAT_NAME]) + alt = float(row["Site_Elevation(m)"]) + if fill_country_flag: + try: + country = gcd.lookup(lat, lon)["ISO_A2_EH"] + except Geocoder_Reverse_Exception: + country = "NN" + else: + country = "NN" + + # units of Aeronet data are always 1 + units = "1" + if not station in self._stations: + self._stations[station] = Station( + { + "station": station, + "longitude": lon, + "latitude": lat, + "altitude": alt, + "country": country, + "url": "", + "long_name": station, + } + ) + # every line contains all variables, sometimes filled with NaNs though + if _ridx == 0: + for variable in DATA_VARS: + if variable in self._data: + da = self._data[variable] + if da.units != units: + raise Exception( + f"unit change from '{da.units}' to 'units'" + ) + else: + da = NpStructuredData(variable, units) + self._data[variable] = da + + day, month, year = row[DATE_NAME].split(":") + datestring = "-".join([year, month, day]) + datestring = "T".join([datestring, row[TIME_NAME]]) + time_dummy = np.datetime64(datestring) + start = time_dummy - TS_TYPE_DIFFS[ts_type] + end = time_dummy + TS_TYPE_DIFFS[ts_type] + + ts_dummy_data = {} + for variable in DATA_VARS: + try: + value = float(row[variable]) + if value == NAN_VAL: + value = np.nan + # store value in ts_dummy_data, so we don't need to perform the nan check + # for each component of calculated values again + ts_dummy_data[variable] = value + except KeyError: + # computed variable + if variable == AOD550_NAME: + value = self.compute_od_from_angstromexp( + 0.55, + ts_dummy_data[AOD440_NAME], + 0.44, + ts_dummy_data[ANG4487_NAME], + ) + self._data[variable].append( + value, station, lat, lon, alt, start, end, Flag.VALID, np.nan + ) + bar.close() + + def _unfiltered_data(self, varname) -> Data: + return self._data[varname] + + def _unfiltered_stations(self) -> dict[str, Station]: + return self._stations + + def _unfiltered_variables(self) -> list[str]: + return list(self._data.keys()) + + def close(self): + pass + + def compute_od_from_angstromexp( + self, to_lambda: float, od_ref: float, lambda_ref: float, angstrom_coeff: float + ) -> float: + """Compute AOD at specified wavelength + + Uses Angstrom coefficient and reference AOD to compute the + corresponding wavelength shifted AOD + + Parameters + ---------- + to_lambda : :obj:`float` or :obj:`ndarray` + wavelength for which AOD is calculated + od_ref : :obj:`float` or :obj:`ndarray` + reference AOD + lambda_ref : :obj:`float` or :obj:`ndarray` + wavelength corresponding to reference AOD + angstrom_coeff : :obj:`float` or :obj:`ndarray` + Angstrom coefficient + + Returns + ------- + :obj:`float` or :obj:`ndarray` + AOD(s) at shifted wavelength + + """ + return od_ref * (lambda_ref / to_lambda) ** angstrom_coeff + + def calc_angstroem_coeff( + self, od1: float, od2: float, wl1: float, wl2: float + ) -> float: + """ + small helper method to calculate angstroem coefficient + + :param od1: + :param od2: + :param wl1: + :param wl2: + :return: + """ + return -np.log(od1 / od2) / np.log(wl1 / wl2) + + def is_valid_url(self, url): + try: + result = urlparse(url) + return all([result.scheme, result.netloc]) + except ValueError: + return False + + +class AeronetSunTimeseriesEngine(AutoFilterReaderEngine.AutoFilterEngine): + def reader_class(self): + return AeronetSunTimeseriesReader + + def open(self, filename, *args, **kwargs) -> AeronetSunTimeseriesReader: + return self.reader_class()(filename, *args, **kwargs) + + def description(self): + return "Simple reader of AeronetSun-files using the pyaro infrastructure" + + def url(self): + return "https://github.com/metno/pyaro-readers" diff --git a/src/pyaro_readers/eeareader/eeadownloader.py b/src/pyaro_readers/eeareader/eeadownloader.py new file mode 100644 index 0000000..2d91a71 --- /dev/null +++ b/src/pyaro_readers/eeareader/eeadownloader.py @@ -0,0 +1,65 @@ +import requests +import pprint +import polars as pl +from pyarrow.dataset import dataset + +class EEADownloader: + BASE_URL = "https://eeadmz1-downloads-api-appservice.azurewebsites.net/" + ENDPOINT = "ParquetFile" + + request_body = dict( + contries=[], + cities=[], + properties=[], + datasets=[], + source="" + ) + + def __init__(self, ) -> None: + pass + + + def _make_request(self, request: dict): + results = requests.post(self.BASE_URL + self.ENDPOINT, json=request) + + if results.status_code == 200: + return results.content + else: + raise results.raise_for_status() + + def get_countries(self): + country_file = requests.get(self.BASE_URL + "Country").json() + pprint.pp(country_file) + + + def open_files(self, folder): + #ds = dataset(folder + "/*.parquet", format="parquet") + df = pl.scan_parquet(folder) + return df + + + + +if __name__ == "__main__": + eead = EEADownloader() + request = { + "countries": [ + "NO" + ], + "cities": [ + + ], + "properties": [ + + ], + "datasets": [ + 1,2 + ], + "source": "Api" +} + #result = eead._make_request(request) + #with open("NO_EEA.zip", "wb") as f: + # f.write(result) + + df = eead.open_files("NO_EEA/E1a/*.parquet") + breakpoint()