Skip to content

Commit

Permalink
WIP: try reading actual data
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Griesfeller committed Sep 4, 2024
1 parent 37a7970 commit 14190d7
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 84 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = pyaro_readers
version = 0.0.10.dev0
version = 0.0.10.dev1
author = MET Norway
description = implementations of pyaerocom reading plugings using pyaro as interface
long_description = file: README.md
Expand Down
60 changes: 46 additions & 14 deletions src/pyaro_readers/actrisebas/ActrisEbasReader.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
import tomllib
from io import BytesIO
from urllib.parse import urlparse
from urllib.parse import urlparse, quote
from urllib.request import urlopen
from urllib3.util.retry import Retry
from urllib3.poolmanager import PoolManager

import numpy as np
import requests
Expand All @@ -25,26 +27,33 @@
# BASE_API_URL = "https://prod-actris-md.nilu.no/Vocabulary/categories"
BASE_API_URL = "https://prod-actris-md.nilu.no/"
# base URL to query for data for a certain variable
VAR_QUERY_URL = f"{BASE_API_URL}/content/"
VAR_QUERY_URL = f"{BASE_API_URL}Metadata/content/"
# basename of definitions.toml which connects the pyaerocom variable names with the ACTRIS variable names
DEFINITION_FILE_BASENAME = "definitions.toml"

DEFINITION_FILE = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
DEFINITION_FILE_BASENAME)
os.path.dirname(os.path.realpath(__file__)), DEFINITION_FILE_BASENAME
)

# number of times an api request is tried before we consider it failed
MAX_RETRIES = 2


class ActrisEbasRetryException(Exception):
pass


class ActrisEbasTimeSeriesReader(AutoFilterReaderEngine.AutoFilterReader):
def __init__(
self,
filename,
# filename,
filters=[],
var_name="ozone mass concentration",
tqdm_desc: str | None = None,
ts_type: str = "daily",
):
"""
"""
self._filename = filename
""" """
self._filename = None
self._stations = {}
self._data = {} # var -> {data-array}
self._set_filters(filters)
Expand All @@ -53,9 +62,30 @@ def __init__(
self._revision = datetime.datetime.min
# read config file
self._def_data = self._read_definitions(file=DEFINITION_FILE)

# bar = tqdm(desc=tqdm_desc, total=len(lines))
# bar.close()
if not isinstance(var_name, list):
var_name = [var_name]

for var in var_name:
# search for variable metadata
query_url = f"{VAR_QUERY_URL}{quote(var)}"
retries = Retry(connect=5, read=2, redirect=5)
http = PoolManager(retries=retries)
response = http.request("GET", query_url)

# retry_counter = MAX_RETRIES
# while retry_counter > 0:
# try:
# response = requests.get(query_url)
# response.raise_for_status() # raises an HTTPError if the status code is >= 400
# except requests.exceptions.HTTPError as err:
# print(f"Error: {err}")

# text_resp = response.data.decode('utf-8')
json_resp = json.loads(response.data.decode("utf-8"))
print(json_resp)

bar = tqdm(desc=tqdm_desc, total=len(lines))
bar.close()

def metadata(self):
return dict(revision=datetime.datetime.strftime(self._revision, "%y%m%d%H%M%S"))
Expand All @@ -75,7 +105,7 @@ def close(self):
def _read_definitions(self, file=DEFINITION_FILE):
# definitions file for a connection between aerocom names, ACTRIS vocabulary and EBAS vocabulary
# The EBAS part will hopefully not be necessary in the next EBAS version anymore
with open(file, 'rb') as fh:
with open(file, "rb") as fh:
tmp = tomllib.load(fh)
return tmp

Expand All @@ -91,8 +121,10 @@ class ActrisEbasTimeSeriesEngine(AutoFilterReaderEngine.AutoFilterEngine):
def reader_class(self):
return ActrisEbasTimeSeriesReader

def open(self, filename, *args, **kwargs) -> ActrisEbasTimeSeriesReader:
return self.reader_class()(filename, *args, **kwargs)
# def open(self, filename, *args, **kwargs) -> ActrisEbasTimeSeriesReader:
def open(self, *args, **kwargs) -> ActrisEbasTimeSeriesReader:
# return self.reader_class()(filename, *args, **kwargs)
return self.reader_class()(*args, **kwargs)

def description(self):
return "ACTRIS EBAS reader using the pyaro infrastructure"
Expand Down
88 changes: 19 additions & 69 deletions tests/test_ActrisEbasReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,26 @@
import pyaro.timeseries
from pyaro.timeseries.Wrappers import VariableNameChangingReader

TEST_URL = "https://pyaerocom.met.no/pyaro-suppl/testdata/aeronetsun_testdata.csv"
TEST_URL = "https://prod-actris-md.nilu.no/Version"
VOCABULARY_URL = "https://prod-actris-md.nilu.no/V"
TEST_ZIP_URL = (
"https://pyaerocom.met.no/pyaro-suppl/testdata/aeronetsun_testdata.csv.zip"
)
AERONETSUN_URL = "https://aeronet.gsfc.nasa.gov/data_push/V3/All_Sites_Times_Daily_Averages_AOD20.zip"


class TestActrisEbasTimeSeriesReader(unittest.TestCase):
file = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"testdata",
"aeronetsun_testdata.csv",
)

# def external_resource_available(self, url):
def test_api_online(self, url=TEST_URL):
try:
req = urllib.request.Request(TEST_URL, method="HEAD")
resp = urllib.request.urlopen(req)
resp.url
return True
except:
return False

# def test_vocabulary(self, url=TEST_URL):
# try:
# req = urllib.request.Request(TEST_URL, method="HEAD")
# resp = urllib.request.urlopen(req)
Expand All @@ -29,75 +34,20 @@ class TestActrisEbasTimeSeriesReader(unittest.TestCase):
# except:
# return False
#
# def test_dl_data_unzipped(self):
# if not self.external_resource_available(TEST_URL):
# self.skipTest(f"external resource not available: {TEST_URL}")
# engine = pyaro.list_timeseries_engines()["actrisebasreader"]
# with engine.open(
# TEST_URL,
# filters=[],
# fill_country_flag=False,
# tqdm_desc="test_dl_data_unzipped",
# ) as ts:
# count = 0
# for var in ts.variables():
# count += len(ts.data(var))
# self.assertEqual(count, 49965)
# self.assertEqual(len(ts.stations()), 4)
# self.assertGreaterEqual(int(ts.metadata()["revision"]), 220622120000)
#
# def test_dl_data_zipped(self):
# if not self.external_resource_available(TEST_ZIP_URL):
# self.skipTest(f"external resource not available: {TEST_ZIP_URL}")
# engine = pyaro.list_timeseries_engines()["actrisebasreader"]
# with engine.open(
# TEST_ZIP_URL,
# filters=[],
# fill_country_flag=False,
# tqdm_desc="test_dl_data_zipped",
# ) as ts:
# count = 0
# for var in ts.variables():
# count += len(ts.data(var))
# self.assertEqual(count, 49965)
# self.assertEqual(len(ts.stations()), 4)
# self.assertGreaterEqual(int(ts.metadata()["revision"]), 220622120000)
#
# def test_aeronet_data_zipped(self):
# if not os.path.exists("/lustre"):
# self.skipTest(f"lustre not available; skipping Aeronet download on CI")
#
# if not self.external_resource_available(AERONETSUN_URL):
# self.skipTest(f"external resource not available: {AERONETSUN_URL}")
# engine = pyaro.list_timeseries_engines()["actrisebasreader"]
# with engine.open(
# AERONETSUN_URL,
# filters=[],
# fill_country_flag=False,
# tqdm_desc="aeronet data zipped",
# ) as ts:
# count = 0
# for var in ts.variables():
# count += len(ts.data(var))
# self.assertGreaterEqual(count, 49965)
# self.assertGreaterEqual(len(ts.stations()), 4)
# self.assertGreaterEqual(int(ts.metadata()["revision"]), 240523120000)

def test_init(self):
engine = pyaro.list_timeseries_engines()["actrisebas"]
self.assertEqual(engine.url(), "https://github.com/metno/pyaro-readers")
# just see that it doesn't fail
engine.description()
engine.args()
# with engine.open(
# self.file, filters=[], fill_country_flag=True, tqdm_desc="test_init"
# ) as ts:
# count = 0
# for var in ts.variables():
# count += len(ts.data(var))
# self.assertEqual(count, 49965)
# self.assertEqual(len(ts.stations()), 4)
assert engine.args()

def test_api_reading(self):
# test access to the EBAS API
engine = pyaro.list_timeseries_engines()["actrisebas"]
with engine.open(var_name="ozone mass concentration", filters=[]) as ts:
# test that the definitions file could be read properly
self.assertGreaterEqual((len(ts._def_data)['variables'], 2))
# def test_stationfilter(self):
# engine = pyaro.list_timeseries_engines()["aeronetsunreader"]
# sfilter = pyaro.timeseries.filters.get("stations", exclude=["Cuiaba"])
Expand Down

0 comments on commit 14190d7

Please sign in to comment.