Skip to content

Commit

Permalink
Adds filtering for invalid datapoints
Browse files Browse the repository at this point in the history
  • Loading branch information
dulte committed Aug 30, 2024
1 parent 88a8bf7 commit 4fbda1e
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 100 deletions.
107 changes: 69 additions & 38 deletions src/pyaro_readers/eeareader/EEATimeseriesReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

FLAGS_VALID = {-99: False, -1: False, 1: True, 2: False, 3: False, 4: True}
VERIFIED_LVL = [1, 2, 3]
DATA_TOML = "/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/data.toml"
DATA_TOML = Path(__file__).parent / "data.toml"
FILL_COUNTRY_FLAG = False

TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
Expand Down Expand Up @@ -57,7 +57,6 @@
start_times="Start",
end_times="End",
flags="Validity",
#countries="country",
)


Expand All @@ -78,11 +77,14 @@ def __init__(
filter_time = False
if "time_bounds" in filters:
if "start_include" in filters["time_bounds"]:
start_date = datetime.strptime(filters["time_bounds"]["start_include"][0][0], TIME_FORMAT)
end_date = datetime.strptime(filters["time_bounds"]["start_include"][0][1], TIME_FORMAT)
start_date = datetime.strptime(
filters["time_bounds"]["start_include"][0][0], TIME_FORMAT
)
end_date = datetime.strptime(
filters["time_bounds"]["start_include"][0][1], TIME_FORMAT
)
filter_time = True


if len(species) == 0:
raise ValueError(
f"As of now, you have to give the species you want to read in filter.variables.include"
Expand All @@ -96,11 +98,19 @@ def __init__(
)
for s in species:
files = self._create_file_list(filename, s)

if filter_time:
datapoints = self._filter_dates(polars.scan_parquet(files), (start_date, end_date)).select(polars.len()).collect()[0, 0]
datapoints = (
self._filter_dates(
polars.scan_parquet(files), (start_date, end_date)
)
.select(polars.len())
.collect()[0, 0]
)
else:
datapoints = polars.scan_parquet(files).select(polars.len()).collect()[0, 0]

datapoints = (
polars.scan_parquet(files).select(polars.len()).collect()[0, 0]
)

array = np.empty(datapoints, np.dtype(DTYPES))

Expand All @@ -109,34 +119,33 @@ def __init__(

current_idx = 0

# if filter_time:
# df = self._filter_dates(polars.scan_parquet(files), (start_date, end_date)).collect()
# else:
# df = polars.scan_parquet(files).collect()

# for key in tqdm(PARQUET_FIELDS):
# array[key] = df.get_column(PARQUET_FIELDS[key]).to_numpy()

for file in tqdm(files, disable=None):


for file in tqdm(files):

# Filters by time
if filter_time:
lf = self._filter_dates(polars.read_parquet(file), (start_date, end_date))
lf = self._filter_dates(
polars.read_parquet(file), (start_date, end_date)
)
if lf.is_empty():
#print(f"Empty filter for {file}")
continue
else:
lf = polars.read_parquet(file)

# Filters out invalid data
lf = lf.filter(polars.col(PARQUET_FIELDS["flags"]) > 0)

file_datapoints = lf.select(polars.len())[0, 0]

file_datapoints = lf.select(polars.len())[0,0]#.collect()
df = lf#.collect()
if file_datapoints == 0:
continue
df = lf

file_unit = df.row(0)[df.get_column_index("Unit")]

for key in PARQUET_FIELDS:
array[key][current_idx : current_idx + file_datapoints] = df.get_column(PARQUET_FIELDS[key]).to_numpy()
array[key][current_idx : current_idx + file_datapoints] = (
df.get_column(PARQUET_FIELDS[key]).to_numpy()
)

current_idx += file_datapoints

Expand All @@ -147,20 +156,30 @@ def __init__(
raise ValueError(
f"Found multiple units ({file_unit} and {species_unit}) for same species {s}"
)


metadatarow = df.row(0)
station_fields = {
"station": metadatarow[df.get_column_index(PARQUET_FIELDS["stations"])],
"longitude": metadatarow[df.get_column_index(PARQUET_FIELDS["longitudes"])],
"latitude": metadatarow[df.get_column_index(PARQUET_FIELDS["latitudes"])],
"altitude": metadatarow[df.get_column_index(PARQUET_FIELDS["altitudes"])],
"station": metadatarow[
df.get_column_index(PARQUET_FIELDS["stations"])
],
"longitude": metadatarow[
df.get_column_index(PARQUET_FIELDS["longitudes"])
],
"latitude": metadatarow[
df.get_column_index(PARQUET_FIELDS["latitudes"])
],
"altitude": metadatarow[
df.get_column_index(PARQUET_FIELDS["altitudes"])
],
"country": metadatarow[df.get_column_index("country")],
"url": "",
"long_name": metadatarow[df.get_column_index(PARQUET_FIELDS["stations"])],
"long_name": metadatarow[
df.get_column_index(PARQUET_FIELDS["stations"])
],
}
self._stations[metadatarow[df.get_column_index(PARQUET_FIELDS["stations"])]] = Station(station_fields)

self._stations[
metadatarow[df.get_column_index(PARQUET_FIELDS["stations"])]
] = Station(station_fields)

data = NpStructuredData(variable=s, units=species_unit)
data.set_data(variable=s, units=species_unit, data=array)
Expand All @@ -179,12 +198,17 @@ def _get_species_ids(self, species: list[str]) -> list[int]:
if poll[key] in species:
ids.append(key)
return ids

def _filter_dates(self, lf: polars.LazyFrame | polars.DataFrame, dates: tuple[datetime]) -> polars.LazyFrame | polars.DataFrame:

def _filter_dates(
self, lf: polars.LazyFrame | polars.DataFrame, dates: tuple[datetime]
) -> polars.LazyFrame | polars.DataFrame:
if dates[0] >= dates[1]:
raise ValueError(f"Error when filtering data. Last date {dates[1]} must be larger than the first {dates[0]}")
#return lf.with_columns(polars.col(PARQUET_FIELDS["start_times"]).str.strptime(polars.Date)).filter(polars.col(PARQUET_FIELDS["start_times"]).is_between(dates[0], dates[1]))
return lf.filter(polars.col(PARQUET_FIELDS["start_times"]).is_between(dates[0], dates[1]))
raise ValueError(
f"Error when filtering data. Last date {dates[1]} must be larger than the first {dates[0]}"
)
return lf.filter(
polars.col(PARQUET_FIELDS["start_times"]).is_between(dates[0], dates[1])
)

def _unfiltered_data(self, varname) -> Data:
return self._data[varname]
Expand Down Expand Up @@ -214,7 +238,14 @@ def url(self):


if __name__ == "__main__":
filters = {"variables": {"include": ["PM10"]}, "time": {"start": "2018-01-01", "stop": "2018-12-31"}}
import pyaerocom as pya

print(pya.const.CACHEDIR)
# exit()
filters = {
"variables": {"include": ["PM10"]},
"time_bounds": {"start_include": [("2018-01-01 0:0:0", "2018-01-31 0:0:0")]},
}
EEATimeseriesReader(
"/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/renamed/",
filters=filters,
Expand Down
110 changes: 48 additions & 62 deletions src/pyaro_readers/eeareader/eeadownloader.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import requests
import pprint

import polars as pl
import typer
from typing_extensions import Annotated

# from pyarrow.dataset import dataset
import json
import pathlib
from pathlib import Path
import json
import csv


import toml
import os

import threading

from tqdm import tqdm

Expand All @@ -33,8 +29,8 @@ class EEADownloader:
URL_ENDPOINT = "urls"
URL_POLLUTANT = "http://dd.eionet.europa.eu/vocabulary/aq/pollutant/"

METADATFILE = "/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/metadata.csv"
DATAFILE = "/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/data.toml"
METADATFILE = Path(__file__).parent / "metadata.csv"
DATAFILE = Path(__file__).parent / "data.toml"

DEFAULT_POLLUTANTS = [
"SO2",
Expand Down Expand Up @@ -161,12 +157,14 @@ def download_default(
threads = []

errorfile = open("errors.txt", "w")
pbar = tqdm(countries, desc="Countries")
pbar = tqdm(countries, desc="Countries", disable=None)
for country in pbar:
# print(f"Running for {country}")
pbar.set_description(f"{country}")
for poll in tqdm(
self.DEFAULT_POLLUTANTS[:2], desc="Pollutants", leave=False
self.DEFAULT_POLLUTANTS[:2],
desc="Pollutants",
leave=False,
disable=None,
):
full_loc = save_loc / poll / country

Expand All @@ -178,24 +176,6 @@ def download_default(
"source": "Api",
}
self.download_and_save(request, full_loc)
# try:
# self.download_and_save(request, full_loc)
# except:
# errorfile.write(f"Failed for {country}, {poll}")
# continue

# thread = threading.Thread(
# target=self.download_and_save,
# args=(
# request,
# full_loc,
# ),
# )
# thread.start()
# threads.append(thread)

# for thread in threads:
# thread.join()

errorfile.close()

Expand Down Expand Up @@ -236,27 +216,26 @@ def postprocess_all_files(self, from_folder: Path, to_folder: Path) -> None:
to_folder.mkdir(parents=True, exist_ok=True)
conversion_error = open(to_folder / "errors.txt", "w")
error_n = 0
for poll in tqdm(polls, desc="Pollutant"):
for poll in tqdm(polls, desc="Pollutant", disable=None):
countries = [
str(x).split("/")[-1]
for x in (from_folder / poll).iterdir()
if x.is_dir()
]
for country in tqdm(countries, desc="Country", leave=False):
for country in tqdm(countries, desc="Country", leave=False, disable=None):
folder = from_folder / poll / country
new_folder = to_folder / poll / country

if not new_folder.is_dir():
new_folder.mkdir(parents=True, exist_ok=True)

files = folder.glob("*.parquet")
for file in tqdm(files, desc="Files", leave=False):
for file in tqdm(files, desc="Files", leave=False, disable=None):
try:
df = self._postprocess_file(file, metadata=metadata)
df.write_parquet(new_folder / file.name)

except Exception as e:
# raise ValueError(f"{file} failed with {e}")
error_n += 1
conversion_error.write(
f"{error_n}: Error in converting {file} due to {e}\n"
Expand All @@ -265,45 +244,52 @@ def postprocess_all_files(self, from_folder: Path, to_folder: Path) -> None:
print(f"Finished with {error_n} errors")
conversion_error.close()

# with open(to_folder / "metadata.csv", "w") as f:
# f.write("filename, lon, lat, alt, Pollutant, CountryCode, StationName \n")
# for entry in metadata:
# item = metadata[entry]
# f.write(
# f'{entry}, {item["lon"]}, {item["lat"]}, {item["alt"]}, {item["Pollutant"]}, {item["CountryCode"]}, {item["StationName"]} \n'
# )
# new_filename = file.parent / f"processed_{file.name}"
# df.write_parquet(new_filename)


@app.command(name="download")
def download(save_loc: Path):
@app.command(
name="download",
help="Downloads the data in a given folder. Data will be orders in folders corresponding to pollutant and country code",
)
def download(
save_loc: Annotated[
Path, typer.Argument(help="Location where the data will be downloaded to")
]
):
eead = EEADownloader()
eead.download_default(save_loc)


@app.command(name="postprocess")
def postprocess(from_folder: Path, to_folder: Path):
@app.command(
name="postprocess",
help="Postprocesses the data to make the reading by pyaro faster",
)
def postprocess(
from_folder: Annotated[
Path, typer.Argument(help="The folder where the original data is found")
],
to_folder: Annotated[
Path, typer.Argument(help="Folder where the processes data will be stored")
],
):
eead = EEADownloader()
eead.postprocess_all_files(from_folder, to_folder)


if __name__ == "__main__":

# app()
app()

eead = EEADownloader()
# eead.download_default(
# eead = EEADownloader()
# # eead.download_default(
# # Path(
# # "/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/data"
# # )
# # )

# eead.postprocess_all_files(
# Path(
# "/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/data"
# )
# ),
# Path(
# "/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/renamed"
# ),
# )

eead.postprocess_all_files(
Path(
"/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/data"
),
Path(
"/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/renamed"
),
)

0 comments on commit 4fbda1e

Please sign in to comment.