Skip to content

Commit

Permalink
Use correct timezone for reading hourly data
Browse files Browse the repository at this point in the history
  • Loading branch information
magnusuMET committed Nov 5, 2024
1 parent b6d42d1 commit 100ffc5
Showing 1 changed file with 35 additions and 10 deletions.
45 changes: 35 additions & 10 deletions src/pyaro_readers/eeareader/EEATimeseriesReader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from datetime import datetime
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
from collections.abc import Iterable
Expand Down Expand Up @@ -89,7 +89,6 @@ def _nrecords(self) -> int:


def _read(filepath: Path, pyarrow_filters) -> polars.DataFrame:
# TODO: Timezone fixup??
return polars.read_parquet(
filepath,
use_pyarrow=True,
Expand Down Expand Up @@ -121,6 +120,13 @@ def _pyarrow_timefilter(
# TODO: Make this support more filtering whilst reading
min_time, max_time = filter.envelope()

# OBS: Critical assumption
# Timezones for HOURLY data is given in UTC+1, but input filters
# assume UTC. We must therefore add an hour for the envelope
offset = timedelta(hours=1)
min_time += offset
min_time += offset

return [
("Start", ">=", min_time),
("Start", "<=", max_time),
Expand Down Expand Up @@ -294,27 +300,43 @@ def _read(

dataset.vstack(_read(file, filters.pyarrow), in_place=True)

dataset = dataset.rechunk()

# Join with metadata table to get latitude, longitude and altitude
metadata = self._metadata.with_columns(
(
polars.col("Country").map_elements(_country_code_eea, return_dtype=str)
+ "/"
+ polars.col("Sampling Point Id")
).alias("selector")
).alias("selector"),
).select(
[
"selector",
"Altitude",
"Longitude",
"Latitude",
"Duration Unit",
]
)

joined = dataset.join(
metadata, left_on="Samplingpoint", right_on="selector", how="left"
# OBS: Times are given in this timezone for non-daily observations
# this assumption is also used for pyarrow filtering
original_timezone_for_hourly_data = "Etc/GMT+1"
joined = (
dataset.join(
metadata, left_on="Samplingpoint", right_on="selector", how="left"
)
.with_columns(
polars.col("Start")
.dt.replace_time_zone(original_timezone_for_hourly_data)
.dt.convert_time_zone("UTC"),
polars.col("End")
.dt.replace_time_zone(original_timezone_for_hourly_data)
.dt.convert_time_zone("UTC"),
)
.filter(
polars.col("Duration Unit").eq("hour"),
)
)

assert (
joined.filter(polars.col("Longitude").is_null()).shape[0] == 0
), "Some stations does not have a suitable left join"
Expand Down Expand Up @@ -380,7 +402,10 @@ class EEATimeseriesEngine(Engine):
args: list[str] = ["filename_or_obj_or_url", "enable_progressbar"]
supported_filters: list[str] = EEATimeseriesReader.supported_filters
description: str = """EEA reader for parquet files
Files are downloaded from https://eeadmz1-downloads-webapp.azurewebsites.net/ using the following directory structure:
Read and filter hourly data from EEA stations using the unverified dataset.
Files must be downloaded from https://eeadmz1-downloads-webapp.azurewebsites.net/ using the following directory structure:
datadir (this path should be passed to `open`)
- metadata.csv (from https://discomap.eea.europa.eu/App/AQViewer/index.html?fqn=Airquality_Dissem.b2g.measurements)
- historical (directory)
Expand All @@ -393,8 +418,8 @@ class EEATimeseriesEngine(Engine):
- AL
- ...
In each category (historical, verified, unverified) the EEA country codes are used
for each country.
In each category (historical, verified, unverified) the EEA country codes are used.
This might differ from pyaro country codes.
Data can be downloaded using the airbase tool (https://github.com/JohnPaton/airbase/)
OBS: Must use github version, pypi version does not download parquet files yet
Expand Down

0 comments on commit 100ffc5

Please sign in to comment.