From 100ffc57bb97908cdaeadc06477b00576b8ac8c7 Mon Sep 17 00:00:00 2001 From: Magnus Ulimoen Date: Tue, 5 Nov 2024 10:43:11 +0100 Subject: [PATCH] Use correct timezone for reading hourly data --- .../eeareader/EEATimeseriesReader.py | 45 ++++++++++++++----- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/src/pyaro_readers/eeareader/EEATimeseriesReader.py b/src/pyaro_readers/eeareader/EEATimeseriesReader.py index 2a4e738..84d6117 100644 --- a/src/pyaro_readers/eeareader/EEATimeseriesReader.py +++ b/src/pyaro_readers/eeareader/EEATimeseriesReader.py @@ -1,5 +1,5 @@ import logging -from datetime import datetime +from datetime import datetime, timedelta from pathlib import Path from typing import Any from collections.abc import Iterable @@ -89,7 +89,6 @@ def _nrecords(self) -> int: def _read(filepath: Path, pyarrow_filters) -> polars.DataFrame: - # TODO: Timezone fixup?? return polars.read_parquet( filepath, use_pyarrow=True, @@ -121,6 +120,13 @@ def _pyarrow_timefilter( # TODO: Make this support more filtering whilst reading min_time, max_time = filter.envelope() + # OBS: Critical assumption + # Timezones for HOURLY data is given in UTC+1, but input filters + # assume UTC. We must therefore add an hour for the envelope + offset = timedelta(hours=1) + min_time += offset + min_time += offset + return [ ("Start", ">=", min_time), ("Start", "<=", max_time), @@ -294,27 +300,43 @@ def _read( dataset.vstack(_read(file, filters.pyarrow), in_place=True) - dataset = dataset.rechunk() - # Join with metadata table to get latitude, longitude and altitude metadata = self._metadata.with_columns( ( polars.col("Country").map_elements(_country_code_eea, return_dtype=str) + "/" + polars.col("Sampling Point Id") - ).alias("selector") + ).alias("selector"), ).select( [ "selector", "Altitude", "Longitude", "Latitude", + "Duration Unit", ] ) - joined = dataset.join( - metadata, left_on="Samplingpoint", right_on="selector", how="left" + # OBS: Times are given in this timezone for non-daily observations + # this assumption is also used for pyarrow filtering + original_timezone_for_hourly_data = "Etc/GMT+1" + joined = ( + dataset.join( + metadata, left_on="Samplingpoint", right_on="selector", how="left" + ) + .with_columns( + polars.col("Start") + .dt.replace_time_zone(original_timezone_for_hourly_data) + .dt.convert_time_zone("UTC"), + polars.col("End") + .dt.replace_time_zone(original_timezone_for_hourly_data) + .dt.convert_time_zone("UTC"), + ) + .filter( + polars.col("Duration Unit").eq("hour"), + ) ) + assert ( joined.filter(polars.col("Longitude").is_null()).shape[0] == 0 ), "Some stations does not have a suitable left join" @@ -380,7 +402,10 @@ class EEATimeseriesEngine(Engine): args: list[str] = ["filename_or_obj_or_url", "enable_progressbar"] supported_filters: list[str] = EEATimeseriesReader.supported_filters description: str = """EEA reader for parquet files -Files are downloaded from https://eeadmz1-downloads-webapp.azurewebsites.net/ using the following directory structure: + +Read and filter hourly data from EEA stations using the unverified dataset. + +Files must be downloaded from https://eeadmz1-downloads-webapp.azurewebsites.net/ using the following directory structure: datadir (this path should be passed to `open`) - metadata.csv (from https://discomap.eea.europa.eu/App/AQViewer/index.html?fqn=Airquality_Dissem.b2g.measurements) - historical (directory) @@ -393,8 +418,8 @@ class EEATimeseriesEngine(Engine): - AL - ... -In each category (historical, verified, unverified) the EEA country codes are used -for each country. +In each category (historical, verified, unverified) the EEA country codes are used. +This might differ from pyaro country codes. Data can be downloaded using the airbase tool (https://github.com/JohnPaton/airbase/) OBS: Must use github version, pypi version does not download parquet files yet