Skip to content

Commit

Permalink
Add specialised selectors
Browse files Browse the repository at this point in the history
  • Loading branch information
magnusuMET committed Nov 13, 2024
1 parent 76d9447 commit 72b7197
Showing 1 changed file with 42 additions and 8 deletions.
50 changes: 42 additions & 8 deletions src/pyaro_readers/eeareader/EEATimeseriesReader.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
from typing import Any, Literal
from collections.abc import Iterable
import importlib.resources
import dataclasses
Expand Down Expand Up @@ -192,7 +192,10 @@ class EEATimeseriesReader(Reader):
]

def __init__(
self, filename_or_obj_or_url, filters=None, enable_progressbar: bool = False
self, filename_or_obj_or_url, filters=None, enable_progressbar: bool = False,
dataset: Literal["historical", "verified", "unverified"] = "unverified",
station_area: str | list[str] = "all",
station_type: str | list[str] = "all",
):
data_directory = Path(filename_or_obj_or_url)
metadata_file = data_directory.joinpath("metadata.csv")
Expand All @@ -203,6 +206,7 @@ def __init__(
"Detection Limit": polars.Float32,
},
)
self._dataset = dataset

# Vocabulary as found at https://dd.eionet.europa.eu/vocabulary/aq/pollutant
pollutant_file = importlib.resources.files("pyaro_readers.eeareader").joinpath(
Expand Down Expand Up @@ -232,6 +236,17 @@ def __init__(
self._data_directory = data_directory
self._progressbar_enabled = enable_progressbar

if isinstance(station_area, str):
self._station_area = [station_area]
else:
self._station_area = station_are
self._station_type = station_type

if isinstance(station_type, str):
self._station_type = [station_type]
else:
self._station_area = station_type

def metadata(self) -> dict[str, str]:
metadata = dict()
metadata["what"] = "EEA reader"
Expand Down Expand Up @@ -266,13 +281,18 @@ def _read(
variable_id = pollutant_candidates["Id"][0]

filters = _transform_filters(self._filters, variable_id)

# historical_path = self._data_directory.joinpath("historical")
# verified_path = self._data_directory.joinpath("verified")
historical_path = self._data_directory.joinpath("historical")
verified_path = self._data_directory.joinpath("verified")
unverified_path = self._data_directory.joinpath("unverified")

# TODO: Enable depending on data wanted from e.g. time requested
searchpaths = [unverified_path]
searchpaths = []
if self._dataset == "historical":
searchpaths.append(historical_path)
elif self._dataset == "verified":
searchpaths.append(verified_path)
elif self._dataset == "unverified":
searchpaths.append(unverified_path)

dataset = polars.DataFrame(
schema={
Expand Down Expand Up @@ -332,9 +352,17 @@ def _read(
"Longitude",
"Latitude",
"Duration Unit",
"Air Quality Station Area",
"Air Quality Station Type",
]
)

extra_filters = []
if self._station_area != ["all"]:
extra_filters.append(polars.col("Air Quality Station Area").is_in(station_area))
if self._station_type != ["all"]:
extra_filters.append(polars.col("Air Quality Station Type").is_in(station_type))

# OBS: Times are given in this timezone for non-daily observations
# this assumption is also used for pyarrow filtering
original_timezone_for_hourly_data = "Etc/GMT+1"
Expand All @@ -352,6 +380,7 @@ def _read(
)
.filter(
polars.col("Duration Unit").eq("hour"),
*extra_filters,
)
)

Expand Down Expand Up @@ -418,7 +447,7 @@ def close(self) -> None:


class EEATimeseriesEngine(Engine):
args: list[str] = ["filename_or_obj_or_url", "enable_progressbar"]
args: list[str] = ["filename_or_obj_or_url", "enable_progressbar", "dataset", "station_area", "station_type"]
supported_filters: list[str] = EEATimeseriesReader.supported_filters
description: str = """EEA reader for parquet files
Expand Down Expand Up @@ -448,11 +477,16 @@ class EEATimeseriesEngine(Engine):
url: str = "https://github.com/metno/pyaro-readers"

def open(
self, filename_or_obj_or_url, enable_progressbar: bool = False, *, filters=None
self, filename_or_obj_or_url, enable_progressbar: bool = False,
dataset: Literal["historical", "verified", "unverified"] = "unverified",
station_area: str = "all",
station_type: str = "all",
*, filters=None
):
return EEATimeseriesReader(
filename_or_obj_or_url,
enable_progressbar=enable_progressbar,
dataset=dataset,
filters=filters,
)

Expand Down

0 comments on commit 72b7197

Please sign in to comment.