diff --git a/src/pyaro_readers/eeareader/EEATimeseriesReader.py b/src/pyaro_readers/eeareader/EEATimeseriesReader.py index 0af6b6f..6726593 100644 --- a/src/pyaro_readers/eeareader/EEATimeseriesReader.py +++ b/src/pyaro_readers/eeareader/EEATimeseriesReader.py @@ -22,7 +22,7 @@ FLAGS_VALID = {-99: False, -1: False, 1: True, 2: False, 3: False, 4: True} VERIFIED_LVL = [1, 2, 3] -DATA_TOML = "/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/data.toml" +DATA_TOML = Path(__file__).parent / "data.toml" FILL_COUNTRY_FLAG = False TIME_FORMAT = "%Y-%m-%d %H:%M:%S" @@ -57,7 +57,6 @@ start_times="Start", end_times="End", flags="Validity", - #countries="country", ) @@ -78,11 +77,14 @@ def __init__( filter_time = False if "time_bounds" in filters: if "start_include" in filters["time_bounds"]: - start_date = datetime.strptime(filters["time_bounds"]["start_include"][0][0], TIME_FORMAT) - end_date = datetime.strptime(filters["time_bounds"]["start_include"][0][1], TIME_FORMAT) + start_date = datetime.strptime( + filters["time_bounds"]["start_include"][0][0], TIME_FORMAT + ) + end_date = datetime.strptime( + filters["time_bounds"]["start_include"][0][1], TIME_FORMAT + ) filter_time = True - if len(species) == 0: raise ValueError( f"As of now, you have to give the species you want to read in filter.variables.include" @@ -96,11 +98,19 @@ def __init__( ) for s in species: files = self._create_file_list(filename, s) + if filter_time: - datapoints = self._filter_dates(polars.scan_parquet(files), (start_date, end_date)).select(polars.len()).collect()[0, 0] + datapoints = ( + self._filter_dates( + polars.scan_parquet(files), (start_date, end_date) + ) + .select(polars.len()) + .collect()[0, 0] + ) else: - datapoints = polars.scan_parquet(files).select(polars.len()).collect()[0, 0] - + datapoints = ( + polars.scan_parquet(files).select(polars.len()).collect()[0, 0] + ) array = np.empty(datapoints, np.dtype(DTYPES)) @@ -109,34 +119,33 @@ def __init__( current_idx = 0 - # if filter_time: - # df = self._filter_dates(polars.scan_parquet(files), (start_date, end_date)).collect() - # else: - # df = polars.scan_parquet(files).collect() - - # for key in tqdm(PARQUET_FIELDS): - # array[key] = df.get_column(PARQUET_FIELDS[key]).to_numpy() - + for file in tqdm(files, disable=None): - - for file in tqdm(files): - + # Filters by time if filter_time: - lf = self._filter_dates(polars.read_parquet(file), (start_date, end_date)) + lf = self._filter_dates( + polars.read_parquet(file), (start_date, end_date) + ) if lf.is_empty(): - #print(f"Empty filter for {file}") continue else: lf = polars.read_parquet(file) + # Filters out invalid data + lf = lf.filter(polars.col(PARQUET_FIELDS["flags"]) > 0) + + file_datapoints = lf.select(polars.len())[0, 0] - file_datapoints = lf.select(polars.len())[0,0]#.collect() - df = lf#.collect() + if file_datapoints == 0: + continue + df = lf file_unit = df.row(0)[df.get_column_index("Unit")] for key in PARQUET_FIELDS: - array[key][current_idx : current_idx + file_datapoints] = df.get_column(PARQUET_FIELDS[key]).to_numpy() + array[key][current_idx : current_idx + file_datapoints] = ( + df.get_column(PARQUET_FIELDS[key]).to_numpy() + ) current_idx += file_datapoints @@ -147,20 +156,30 @@ def __init__( raise ValueError( f"Found multiple units ({file_unit} and {species_unit}) for same species {s}" ) - metadatarow = df.row(0) station_fields = { - "station": metadatarow[df.get_column_index(PARQUET_FIELDS["stations"])], - "longitude": metadatarow[df.get_column_index(PARQUET_FIELDS["longitudes"])], - "latitude": metadatarow[df.get_column_index(PARQUET_FIELDS["latitudes"])], - "altitude": metadatarow[df.get_column_index(PARQUET_FIELDS["altitudes"])], + "station": metadatarow[ + df.get_column_index(PARQUET_FIELDS["stations"]) + ], + "longitude": metadatarow[ + df.get_column_index(PARQUET_FIELDS["longitudes"]) + ], + "latitude": metadatarow[ + df.get_column_index(PARQUET_FIELDS["latitudes"]) + ], + "altitude": metadatarow[ + df.get_column_index(PARQUET_FIELDS["altitudes"]) + ], "country": metadatarow[df.get_column_index("country")], "url": "", - "long_name": metadatarow[df.get_column_index(PARQUET_FIELDS["stations"])], + "long_name": metadatarow[ + df.get_column_index(PARQUET_FIELDS["stations"]) + ], } - self._stations[metadatarow[df.get_column_index(PARQUET_FIELDS["stations"])]] = Station(station_fields) - + self._stations[ + metadatarow[df.get_column_index(PARQUET_FIELDS["stations"])] + ] = Station(station_fields) data = NpStructuredData(variable=s, units=species_unit) data.set_data(variable=s, units=species_unit, data=array) @@ -179,12 +198,17 @@ def _get_species_ids(self, species: list[str]) -> list[int]: if poll[key] in species: ids.append(key) return ids - - def _filter_dates(self, lf: polars.LazyFrame | polars.DataFrame, dates: tuple[datetime]) -> polars.LazyFrame | polars.DataFrame: + + def _filter_dates( + self, lf: polars.LazyFrame | polars.DataFrame, dates: tuple[datetime] + ) -> polars.LazyFrame | polars.DataFrame: if dates[0] >= dates[1]: - raise ValueError(f"Error when filtering data. Last date {dates[1]} must be larger than the first {dates[0]}") - #return lf.with_columns(polars.col(PARQUET_FIELDS["start_times"]).str.strptime(polars.Date)).filter(polars.col(PARQUET_FIELDS["start_times"]).is_between(dates[0], dates[1])) - return lf.filter(polars.col(PARQUET_FIELDS["start_times"]).is_between(dates[0], dates[1])) + raise ValueError( + f"Error when filtering data. Last date {dates[1]} must be larger than the first {dates[0]}" + ) + return lf.filter( + polars.col(PARQUET_FIELDS["start_times"]).is_between(dates[0], dates[1]) + ) def _unfiltered_data(self, varname) -> Data: return self._data[varname] @@ -214,7 +238,14 @@ def url(self): if __name__ == "__main__": - filters = {"variables": {"include": ["PM10"]}, "time": {"start": "2018-01-01", "stop": "2018-12-31"}} + import pyaerocom as pya + + print(pya.const.CACHEDIR) + # exit() + filters = { + "variables": {"include": ["PM10"]}, + "time_bounds": {"start_include": [("2018-01-01 0:0:0", "2018-01-31 0:0:0")]}, + } EEATimeseriesReader( "/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/renamed/", filters=filters, diff --git a/src/pyaro_readers/eeareader/eeadownloader.py b/src/pyaro_readers/eeareader/eeadownloader.py index 05c6fef..7774cee 100644 --- a/src/pyaro_readers/eeareader/eeadownloader.py +++ b/src/pyaro_readers/eeareader/eeadownloader.py @@ -1,18 +1,14 @@ import requests -import pprint + import polars as pl import typer +from typing_extensions import Annotated -# from pyarrow.dataset import dataset -import json -import pathlib from pathlib import Path -import json -import csv + + import toml -import os -import threading from tqdm import tqdm @@ -33,8 +29,8 @@ class EEADownloader: URL_ENDPOINT = "urls" URL_POLLUTANT = "http://dd.eionet.europa.eu/vocabulary/aq/pollutant/" - METADATFILE = "/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/metadata.csv" - DATAFILE = "/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/data.toml" + METADATFILE = Path(__file__).parent / "metadata.csv" + DATAFILE = Path(__file__).parent / "data.toml" DEFAULT_POLLUTANTS = [ "SO2", @@ -161,12 +157,14 @@ def download_default( threads = [] errorfile = open("errors.txt", "w") - pbar = tqdm(countries, desc="Countries") + pbar = tqdm(countries, desc="Countries", disable=None) for country in pbar: - # print(f"Running for {country}") pbar.set_description(f"{country}") for poll in tqdm( - self.DEFAULT_POLLUTANTS[:2], desc="Pollutants", leave=False + self.DEFAULT_POLLUTANTS[:2], + desc="Pollutants", + leave=False, + disable=None, ): full_loc = save_loc / poll / country @@ -178,24 +176,6 @@ def download_default( "source": "Api", } self.download_and_save(request, full_loc) - # try: - # self.download_and_save(request, full_loc) - # except: - # errorfile.write(f"Failed for {country}, {poll}") - # continue - - # thread = threading.Thread( - # target=self.download_and_save, - # args=( - # request, - # full_loc, - # ), - # ) - # thread.start() - # threads.append(thread) - - # for thread in threads: - # thread.join() errorfile.close() @@ -236,13 +216,13 @@ def postprocess_all_files(self, from_folder: Path, to_folder: Path) -> None: to_folder.mkdir(parents=True, exist_ok=True) conversion_error = open(to_folder / "errors.txt", "w") error_n = 0 - for poll in tqdm(polls, desc="Pollutant"): + for poll in tqdm(polls, desc="Pollutant", disable=None): countries = [ str(x).split("/")[-1] for x in (from_folder / poll).iterdir() if x.is_dir() ] - for country in tqdm(countries, desc="Country", leave=False): + for country in tqdm(countries, desc="Country", leave=False, disable=None): folder = from_folder / poll / country new_folder = to_folder / poll / country @@ -250,13 +230,12 @@ def postprocess_all_files(self, from_folder: Path, to_folder: Path) -> None: new_folder.mkdir(parents=True, exist_ok=True) files = folder.glob("*.parquet") - for file in tqdm(files, desc="Files", leave=False): + for file in tqdm(files, desc="Files", leave=False, disable=None): try: df = self._postprocess_file(file, metadata=metadata) df.write_parquet(new_folder / file.name) except Exception as e: - # raise ValueError(f"{file} failed with {e}") error_n += 1 conversion_error.write( f"{error_n}: Error in converting {file} due to {e}\n" @@ -265,45 +244,52 @@ def postprocess_all_files(self, from_folder: Path, to_folder: Path) -> None: print(f"Finished with {error_n} errors") conversion_error.close() - # with open(to_folder / "metadata.csv", "w") as f: - # f.write("filename, lon, lat, alt, Pollutant, CountryCode, StationName \n") - # for entry in metadata: - # item = metadata[entry] - # f.write( - # f'{entry}, {item["lon"]}, {item["lat"]}, {item["alt"]}, {item["Pollutant"]}, {item["CountryCode"]}, {item["StationName"]} \n' - # ) - # new_filename = file.parent / f"processed_{file.name}" - # df.write_parquet(new_filename) - -@app.command(name="download") -def download(save_loc: Path): +@app.command( + name="download", + help="Downloads the data in a given folder. Data will be orders in folders corresponding to pollutant and country code", +) +def download( + save_loc: Annotated[ + Path, typer.Argument(help="Location where the data will be downloaded to") + ] +): eead = EEADownloader() eead.download_default(save_loc) -@app.command(name="postprocess") -def postprocess(from_folder: Path, to_folder: Path): +@app.command( + name="postprocess", + help="Postprocesses the data to make the reading by pyaro faster", +) +def postprocess( + from_folder: Annotated[ + Path, typer.Argument(help="The folder where the original data is found") + ], + to_folder: Annotated[ + Path, typer.Argument(help="Folder where the processes data will be stored") + ], +): eead = EEADownloader() eead.postprocess_all_files(from_folder, to_folder) if __name__ == "__main__": - # app() + app() - eead = EEADownloader() - # eead.download_default( + # eead = EEADownloader() + # # eead.download_default( + # # Path( + # # "/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/data" + # # ) + # # ) + + # eead.postprocess_all_files( # Path( # "/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/data" - # ) + # ), + # Path( + # "/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/renamed" + # ), # ) - - eead.postprocess_all_files( - Path( - "/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/data" - ), - Path( - "/home/danielh/Documents/pyaerocom/pyaro-readers/src/pyaro_readers/eeareader/renamed" - ), - )