diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 69e5cf35..1eacb69d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,6 +9,9 @@ on: schedule: - cron: "0 12 * * 1" +env: + OPENAQ_API_KEY: "${{ secrets.OPENAQ_API_KEY }}" + jobs: test: name: Test (Py ${{ matrix.python-version }}) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1a1cf822..ceea62ec 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -6,6 +6,7 @@ repos: exclude: tdump\.[0-9]* - id: end-of-file-fixer - id: check-docstring-first + exclude: monetio/obs/openaq_v2\.py - id: check-yaml - repo: https://github.com/asottile/pyupgrade diff --git a/monetio/__init__.py b/monetio/__init__.py index cb14bf8b..6a3088f1 100644 --- a/monetio/__init__.py +++ b/monetio/__init__.py @@ -1,6 +1,19 @@ from . import grids from .models import camx, cmaq, fv3chem, hysplit, hytraj, ncep_grib, pardump, prepchem, raqms -from .obs import aeronet, airnow, aqs, cems, crn, improve, ish, ish_lite, nadp, openaq, pams +from .obs import ( + aeronet, + airnow, + aqs, + cems, + crn, + improve, + ish, + ish_lite, + nadp, + openaq, + openaq_v2, + pams, +) from .profile import geoms, gml_ozonesonde, icartt, tolnet from .sat import goes @@ -29,6 +42,7 @@ "ish_lite", "nadp", "openaq", + "openaq_v2", "pams", # # profile obs diff --git a/monetio/obs/__init__.py b/monetio/obs/__init__.py index 58ce51f6..2add7e9f 100644 --- a/monetio/obs/__init__.py +++ b/monetio/obs/__init__.py @@ -10,6 +10,7 @@ ish_lite, nadp, openaq, + openaq_v2, pams, ) @@ -25,6 +26,7 @@ "cems_mod", "nadp", "openaq", + "openaq_v2", "pams", ] diff --git a/monetio/obs/openaq.py b/monetio/obs/openaq.py index acd8b724..9ca08601 100644 --- a/monetio/obs/openaq.py +++ b/monetio/obs/openaq.py @@ -1,212 +1,519 @@ -"""Short summary. +"""Get v1 (government-only) OpenAQ data from AWS. - Attributes - ---------- - url : type - Description of attribute `url`. - dates : type - Description of attribute `dates`. - df : type - Description of attribute `df`. - daily : type - Description of attribute `daily`. - objtype : type - Description of attribute `objtype`. - filelist : type - Description of attribute `filelist`. - monitor_file : type - Description of attribute `monitor_file`. - __class__ : type - Description of attribute `__class__`. - monitor_df : type - Description of attribute `monitor_df`. - savecols : type - Description of attribute `savecols`. - """ +https://openaq.org/ + +https://openaq-fetches.s3.amazonaws.com/index.html +""" import json +import sys +import warnings import pandas as pd from numpy import nan +_PY39_PLUS = sys.version_info >= (3, 9) + +_URL_CAP_RANDOM_SAMPLE = False # if false, take from end of list +_URL_CAP = None # set to int to limit number of files loaded for testing + + +def add_data(dates, *, n_procs=1, wide_fmt=True): + """Add OpenAQ data from the OpenAQ S3 bucket. -def add_data(dates, n_procs=1): - """add openaq data from the amazon s3 server. + https://openaq-fetches.s3.amazonaws.com + + Note that the source files are daily, so requesting a single day or single hour within it + will take the same amount of time. Parameters ---------- - dates : pd.DateTimeIndex or list of datatime objects - this is a list of dates to download - n_procs : type - Description of parameter `n_procs`. + dates : pandas.DateTimeIndex or list of datetime objects + Dates of data to fetch. + n_procs : int + For Dask. Returns ------- - type - Description of returned object. - + pandas.DataFrame """ a = OPENAQ() - return a.add_data(dates, num_workers=n_procs) + return a.add_data(dates, num_workers=n_procs, wide_fmt=wide_fmt) + + +def read_json(fp_or_url, *, verbose=False): + """Read a JSON file from the OpenAQ S3 bucket, returning dataframe in original long format. + + Parameters + ---------- + fp_or_url : str or path-like + File path or URL. + + Returns + ------- + pandas.DataFrame + """ + from time import perf_counter + + import numpy as np + + tic = perf_counter() + + df = pd.read_json(fp_or_url, lines=True) + + # "attribution" is complex to deal with, just drop for now + # Seems like it can be null or a list of attribution dicts with "name" and "url" + df = df.drop(columns="attribution") + + # Expand nested columns + # Multiple ways to do this, e.g. + # - pd.DataFrame(df.date.tolist()) + # Seems to be fastest for one, works if only one level of nesting + # - pd.json_normalize(df["date"]) + # - pd.json_normalize(json.loads(df["date"].to_json(orient="records"))) + # With this method, can apply to multiple columns at once + df = df.dropna(subset=["coordinates"]) + to_expand = ["date", "averagingPeriod", "coordinates"] + new = pd.json_normalize(json.loads(df[to_expand].to_json(orient="records"))) + + # Convert to time + # If we just apply `pd.to_datetime`, we get + # - utc -> datetime64[ns, UTC] + # - local -> obj (datetime.datetime with tzinfo=tzoffset(None, ...)) + # + # But we don't need localization, we just want non-localized UTC time and UTC offset. + # + # To get the UTC time, e.g.: + # - pd.to_datetime(new["date.utc"]).dt.tz_localize(None) + # These are comparable but this seems slightly faster. + # - pd.to_datetime(new["date.utc"].str.slice(None, -1)) + # + # To get UTC offset + # (we can't subtract the two time arrays since different dtypes), e.g.: + # - pd.to_timedelta(new["date.local"].str.slice(-6, None)+":00") + # Seems to be slightly faster + # - pd.to_datetime(new["date.local"]).apply(lambda t: t.utcoffset()) + time = pd.to_datetime(new["date.utc"]).dt.tz_localize(None) + utcoffset = pd.to_timedelta(new["date.local"].str.slice(-6, None) + ":00") + time_local = time + utcoffset + + # Convert averaging period to timedelta + value = new["averagingPeriod.value"] + units = new["averagingPeriod.unit"] + unique_units = units.dropna().unique() + averagingPeriod = pd.Series(np.full(len(new), nan, dtype="timedelta64[ns]")) + for unit in unique_units: + is_unit = units == unit + averagingPeriod.loc[is_unit] = pd.to_timedelta(value[is_unit], unit=unit) + + # Apply new columns + df = df.drop(columns=to_expand).assign( + time=time, + time_local=time_local, + utcoffset=utcoffset, + latitude=new["coordinates.latitude"], + longitude=new["coordinates.longitude"], + averagingPeriod=averagingPeriod, + ) + + if verbose: + print(f"{perf_counter() - tic:.3f}s") + + return df + + +def read_json2(fp_or_url, *, verbose=False): + """Read a JSON file from the OpenAQ S3 bucket, returning dataframe in original long format. + + This provides 'attribution', while :func:`read_json` does not. + + Parameters + ---------- + fp_or_url : str or path-like + File path or URL. + + Returns + ------- + pandas.DataFrame + """ + import datetime + from time import perf_counter + + tic = perf_counter() + + if isinstance(fp_or_url, str) and fp_or_url.startswith(("http", "s3")): + import requests + + if fp_or_url.startswith("s3"): + fp_or_url = fp_or_url.replace( + "s3://openaq-fetches/", "https://openaq-fetches.s3.amazonaws.com/" + ) + + r = requests.get(fp_or_url, stream=True, timeout=2) + r.raise_for_status() + else: + raise NotImplementedError + + names = [ + "time", + "utcoffset", + "latitude", + "longitude", + # + "parameter", + "value", + "unit", + # + "averagingPeriod", + # + "location", + "city", + "country", + # + "attribution", + "sourceName", + "sourceType", + "mobile", + ] + rows = [] + for line in r.iter_lines(): + if line: + data = json.loads(line) + coords = data.get("coordinates") + if coords is None: + if verbose: + print("Skipping row since no coords:", data) + continue + + # Time + time = datetime.datetime.fromisoformat(data["date"]["utc"][:-1]) + time_local_str = data["date"]["local"] + h = int(time_local_str[-6:-3]) + m = int(time_local_str[-2:]) + utcoffset = datetime.timedelta(hours=h, minutes=m) + + # Averaging period + ap = data.get("averagingPeriod") + if ap is not None: + val = data["averagingPeriod"]["value"] + unit = data["averagingPeriod"]["unit"] + averagingPeriod = datetime.timedelta(**{unit: val}) + else: + averagingPeriod = None + + # Attribution + attrs = data.get("attribution") + if attrs is not None: + attr_names = [a["name"] for a in attrs] + if verbose: + if len(attr_names) > 1: + print(f"Taking first of {len(attr_names)}:", attr_names) + attr_name = attr_names[0] # Just the (hopefully) primary one + + rows.append( + ( + time, + utcoffset, + data["coordinates"]["latitude"], + data["coordinates"]["longitude"], + # + data["parameter"], + data["value"], + data["unit"], + # + averagingPeriod, + # + data["location"], + data["city"], + data["country"], + # + attr_name, + data["sourceName"], + data["sourceType"], + data["mobile"], + ) + ) + + df = pd.DataFrame(rows, columns=names) + + df["time_local"] = df["time"] + df["utcoffset"] + + if verbose: + print(f"{perf_counter() - tic:.3f}s") + + return df class OPENAQ: - def __init__(self): + NON_MOLEC_PARAMS = [ + "pm1", + "pm25", + "pm4", + "pm10", + "bc", + ] + """Parameters that are not molecules and should be in µg/m³ units.""" + + PPM_TO_UGM3 = { + "o3": 1990, + "co": 1160, + "no2": 1900, + "no": 1240, + "so2": 2650, + "ch4": 664, + "co2": 1820, + } + """Conversion factors from ppmv to µg/m³. + + Based on + + - air average molecular weight: 29 g/mol + - air density: 1.2 kg m -3 + + and rounded to 3 significant figures. + """ + + def __init__(self, *, engine="pandas"): + """ + Parameters + ---------- + engine : str, optional + _description_, by default "pandas" + + Raises + ------ + ValueError + _description_ + """ + from functools import partial + import s3fs self.fs = s3fs.S3FileSystem(anon=True) self.s3bucket = "openaq-fetches/realtime" + if engine == "pandas": + self.read = partial(read_json, verbose=False) + elif engine == "python": + self.read = partial(read_json2, verbose=False) + else: + raise ValueError("engine must be 'pandas' or 'python'.") + self.engine = engine + def _get_available_days(self, dates): + """ + Parameters + ---------- + dates : datetime-like or list of datetime-like + ``pd.to_datetime`` will be applied. + """ + # Get all day folders folders = self.fs.ls(self.s3bucket) - days = [j.split("/")[2] for j in folders] - avail_dates = pd.to_datetime(days, format="%Y-%m-%d", errors="coerce") - dates = pd.to_datetime(dates).floor(freq="D") - d = pd.Series(dates, name="dates").drop_duplicates() - ad = pd.Series(avail_dates, name="dates") - return pd.merge(d, ad, how="inner") + days = [folder.split("/")[2] for folder in folders] + dates_available = pd.Series( + pd.to_datetime(days, format=r"%Y-%m-%d", errors="coerce"), + name="dates", + ) + + # Filter by requested dates + dates_requested = pd.Series( + pd.to_datetime(dates).floor(freq="D"), + name="dates", + ).drop_duplicates() + + dates_have = pd.merge(dates_available, dates_requested, how="inner")["dates"] + if dates_have.empty: + raise ValueError(f"No data available for requested dates: {dates_requested}.") + + return dates_have def _get_files_in_day(self, date): - files = self.fs.ls("{}/{}".format(self.s3bucket, date.strftime("%Y-%m-%d"))) + """ + Parameters + ---------- + date + datetime-like object with ``.strftime`` method. + """ + sdate = date.strftime(r"%Y-%m-%d") + files = self.fs.ls(f"{self.s3bucket}/{sdate}") return files def build_urls(self, dates): - d = self._get_available_days(dates) - urls = pd.Series([], name="url") - for i in d.dates: - files = self._get_files_in_day(i) - furls = pd.Series( - [ - f.replace("openaq-fetches", "https://openaq-fetches.s3.amazonaws.com") - for f in files - ], - name="url", - ) - urls = pd.merge(urls, furls, how="outer") - return urls.url.values + """ + Parameters + ---------- + dates : datetime-like or list of datetime-like + ``pd.to_datetime`` will be applied. + """ + dates_ = self._get_available_days(dates) + urls = [] + for date in dates_: + files = self._get_files_in_day(date) + urls.extend(f"s3://{f}" for f in files) + return urls + + def add_data(self, dates, *, num_workers=1, wide_fmt=True): + """Get data for `dates`, using `num_workers` Dask workers. + + Parameters + ---------- + num_workers : int + Number of Dask workers to use to read the JSON files. + wide_fmt : bool + If True, return data in wide format + (each parameter gets its own column, + as opposed to long format with 'parameter', 'value', and 'units' columns). + Accordingly, convert units to consistent units + (ppmv for molecules, µg/m³ for others) + and rename columns to reflect units. + """ + import hashlib - def add_data(self, dates, num_workers=1): import dask import dask.dataframe as dd - urls = self.build_urls(dates).tolist() - # z = dd.read_json(urls).compute() - dfs = [dask.delayed(self.read_json)(f) for f in urls] - dff = dd.from_delayed(dfs) - z = dff.compute(num_workers=num_workers) - z.coordinates.replace(to_replace=[None], value=nan, inplace=True) - z = z.dropna().reset_index(drop=True) - js = json.loads(z[["coordinates", "date"]].to_json(orient="records")) - dff = pd.io.json.json_normalize(js) - dff.columns = dff.columns.str.split(".").str[1] - dff.rename({"local": "time_local", "utc": "time"}, axis=1, inplace=True) - - dff["time"] = pd.to_datetime(dff.time) - dff["utcoffset"] = pd.to_datetime(dff.time_local).apply(lambda x: x.utcoffset()) - zzz = z.join(dff).drop(columns=["coordinates", "date", "attribution", "averagingPeriod"]) - zzz = self._fix_units(zzz) - assert ( - zzz[~zzz.parameter.isin(["pm25", "pm4", "pm10", "bc"])].unit.dropna() == "ppm" - ).all() - zp = self._pivot_table(zzz) - zp["siteid"] = ( - zp.country - + "_" - + zp.latitude.round(3).astype(str) - + "N_" - + zp.longitude.round(3).astype(str) - + "E" - ) + dates = pd.to_datetime(dates) + if isinstance(dates, pd.Timestamp): + dates = pd.DatetimeIndex([dates]) + dates = dates.sort_values() - zp["time"] = zp.time.dt.tz_localize(None) - zp["time_local"] = zp["time"] + zp["utcoffset"] - - return zp.loc[zp.time >= dates.min()] - - def read_json(self, url): - return pd.read_json(url, lines=True).dropna().sort_index(axis=1) - - # def read_json(self, url): - # df = pd.read_json(url, lines=True).dropna() - # df.coordinates.replace(to_replace=[None], - # value=pd.np.nan, - # inplace=True) - # df = df.dropna(subset=['coordinates']) - # # df = self._parse_latlon(df) - # # json_struct = json.loads(df.coordinates.to_json(orient='records')) - # # df_flat = pd.io.json.json_normalize(json_struct) - # # df = self._parse_datetime(df) - # # df = self._fix_units(df) - # # df = self._pivot_table(df) - # return df - - def _parse_latlon(self, df): - # lat = vectorize(lambda x: x['latitude']) - # lon = vectorize(lambda x: x['longitude']) - def lat(x): - return x["latitude"] - - def lon(x): - return x["longitude"] - - df["latitude"] = df.coordinates.apply(lat) - df["longitude"] = df.coordinates.apply(lon) - return df.drop(columns="coordinates") - - def _parse_datetime(self, df): - def utc(x): - return pd.to_datetime(x["utc"]) - - def local(x): - return pd.to_datetime(x["local"]) - - df["time"] = df.date.apply(utc) - df["time_local"] = df.date.apply(local) - return df.drop(columns="date") - - def _fix_units(self, df): - df.loc[df.value <= 0] = nan - # For a certain parameter, different site-times may have different units. - # https://docs.openaq.org/docs/parameters - # These conversion factors are based on - # - air average molecular weight: 29 g/mol - # - air density: 1.2 kg m -3 - # rounded to 3 significant figures. - fs = {"co": 1160, "o3": 1990, "so2": 2650, "no2": 1900, "ch4": 664, "no": 1240} - for vn, f in fs.items(): - is_ug = (df.parameter == vn) & (df.unit == "µg/m³") - df.loc[is_ug, "value"] /= f - df.loc[is_ug, "unit"] = "ppm" - return df + # Get URLs + urls = self.build_urls(dates) + print(f"Will load {len(urls)} files.") + if len(urls) > 0: + print(urls[0]) + if len(urls) > 2: + print("...") + if len(urls) > 1: + print(urls[-1]) + + if _URL_CAP is not None and len(urls) > _URL_CAP: + if _URL_CAP_RANDOM_SAMPLE: + import random + + urls = random.sample(urls, _URL_CAP) + else: + urls = urls[-_URL_CAP:] - def _pivot_table(self, df): - w = df.pivot_table( - values="value", - index=[ + # Read JSON files + func = self.read + dfs = [dask.delayed(func)(url) for url in urls] + df_lazy = dd.from_delayed(dfs) + df = df_lazy.compute(num_workers=num_workers) + + # Ensure data within requested time window + df = df.loc[(df.time >= dates.min()) & (df.time <= dates.max())] + + # Measurements like air comp shouldn't be negative + non_neg_units = [ + "ng/m3", + "particles/cm³", + "ppb", + "ppm", + "ugm3", + "umol/mol", + "µg/m³", + ] + df.loc[df.unit.isin(non_neg_units) & (df.value < 0), "value"] = nan + # Assume value 0 implies below detection limit + + if wide_fmt: + # Convert to consistent units for molecules (ppmv) + # (For a certain parameter, different site-times may have different units.) + for vn, f in self.PPM_TO_UGM3.items(): + is_ug = (df.parameter == vn) & (df.unit == "µg/m³") + df.loc[is_ug, "value"] /= f + df.loc[is_ug, "unit"] = "ppm" + + # Ensure consistent units + non_molec = self.NON_MOLEC_PARAMS + good = (df[~df.parameter.isin(non_molec)].unit.dropna() == "ppm").all() + if not good: + unique_params = sorted(df.parameter.unique()) + molec = [p for p in unique_params if p not in non_molec] + raise ValueError(f"Expected these species to all be in ppm now: {molec}.") + good = (df[df.parameter.isin(non_molec)].unit.dropna() == "µg/m³").all() + if not good: + raise ValueError(f"Expected these species to all be in µg/m³: {non_molec}.") + + # Determine averaging periods for each parameter + aps = {} + for p, g in df.groupby("parameter"): + aps[p] = g.averagingPeriod.dropna().unique() + mult_ap_lines = [] + for p, ap in aps.items(): + if len(ap) > 1: + counts = df.averagingPeriod.loc[df.parameter == p].dropna().value_counts() + s_counts = ", ".join(f"'{v}' ({n})" for v, n in counts.items()) + mult_ap_lines.append(f"{p!r}: {s_counts}") + if mult_ap_lines: + s_mults = "\n".join(f"- {s}" for s in mult_ap_lines) + warnings.warn( + "Multiple averaging periods for" + f"\n{s_mults}" + "\nWill select data with averaging period 1H. " + "Use wide_fmt=False if you want all data." + ) + + # Pivot to wide format (each parameter gets its own column) + index = [ "time", + "time_local", "latitude", "longitude", - "sourceName", - "sourceType", + "utcoffset", + "location", "city", "country", - "utcoffset", - ], - columns="parameter", - ).reset_index() - w = w.rename( - dict( - co="co_ppm", - o3="o3_ppm", - no2="no2_ppm", - so2="so2_ppm", - ch4="ch4_ppm", - no="no_ppm", - bc="bc_ugm3", - pm25="pm25_ugm3", - pm10="pm10_ugm3", - ), - axis=1, - errors="ignore", - ) - return w + "attribution", # currently only in Python reader + "sourceName", + "sourceType", + "mobile", + # "averagingPeriod", # different parameters may have different standard averaging periods + ] + if self.engine == "pandas": + index.remove("attribution") + + # NOTE: 1H is the most common averaging period by far + # NOTE: seems that some sites have dupe rows with city == "N/A" + na_locations = ["Wampanoag Laboratory"] + df = ( + df[ + (df.averagingPeriod == pd.Timedelta("1H")) + & ~(df.location.isin(na_locations) & (df.city == "N/A")) + ] + .pivot_table( + values="value", + index=index, + columns="parameter", + ) + .reset_index() + ) + df["averagingPeriod"] = pd.Timedelta("1H") # TODO: could just not include + df = df.rename(columns={p: f"{p}_ugm3" for p in self.NON_MOLEC_PARAMS}, errors="ignore") + df = df.rename(columns={p: f"{p}_ppm" for p in self.PPM_TO_UGM3}, errors="ignore") + + # Construct site IDs + # Sometimes, at a given time, there are multiple measurements at the same lat/lon + # with different location names. + # Occasionally, there are rows that appear to actual duplicates + # (e.g. all same except one col is null in one or something) + if _PY39_PLUS: + + def do_hash(b): + return hashlib.sha1(b, usedforsecurity=False).hexdigest() + + else: + + def do_hash(b): + return hashlib.sha1(b).hexdigest() + + # to_hash = df.latitude.astype(str) + " " + df.longitude.astype(str) + to_hash = df.location + " " + df.latitude.astype(str) + " " + df.longitude.astype(str) + df["siteid"] = df.country + "_" + to_hash.str.encode("utf-8").apply(do_hash).str.slice(0, 7) + + return df + + +# Need to make an assumption about NOx MW +OPENAQ.PPM_TO_UGM3["nox"] = OPENAQ.PPM_TO_UGM3["no2"] diff --git a/monetio/obs/openaq_v2.py b/monetio/obs/openaq_v2.py new file mode 100644 index 00000000..d709f3c5 --- /dev/null +++ b/monetio/obs/openaq_v2.py @@ -0,0 +1,611 @@ +"""Get AQ data from the OpenAQ v2 REST API. + +Visit https://docs.openaq.org/docs/getting-started to get an API key +and set environment variable ``OPENAQ_API_KEY`` to use it. + +For example, in Bash: + +.. code-block:: bash + + export OPENAQ_API_KEY="your_api_key_here" + +https://openaq.org/ + +https://api.openaq.org/docs#/v2 +""" + +import functools +import json +import logging +import os +import warnings + +import numpy as np +import pandas as pd +import requests + +logger = logging.getLogger(__name__) + +API_KEY = os.environ.get("OPENAQ_API_KEY", None) +if API_KEY is not None: + API_KEY = API_KEY.strip() + if len(API_KEY) != 64: + warnings.warn(f"API key length is {len(API_KEY)}, expected 64") + +_PPM_TO_UGM3 = { + "o3": 1990, + "co": 1160, + "no2": 1900, + "no": 1240, + "so2": 2650, + "ch4": 664, + "co2": 1820, +} +"""Conversion factors from ppmv to µg/m³. + +Based on + +- air average molecular weight: 29 g/mol +- air density: 1.2 kg m -3 + +and rounded to 3 significant figures. +""" + +# NOx assumption +_PPM_TO_UGM3["nox"] = _PPM_TO_UGM3["no2"] + +_NON_MOLEC_PARAMS = [ + "pm1", + "pm25", + "pm4", + "pm10", + "bc", +] +"""Parameters that are not molecules and should be in µg/m³ units.""" + + +def _api_key_warning(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + if API_KEY is None: + warnings.warn( + "Non-cached requests to the OpenAQ v2 web API will be slow without an API key " + "or requests will fail (HTTP error 401). " + "Obtain one (https://docs.openaq.org/docs/getting-started#api-key) " + "and set your OPENAQ_API_KEY environment variable.", + stacklevel=2, + ) + return func(*args, **kwargs) + + return wrapper + + +_BASE_URL = "https://api.openaq.org" +_ENDPOINTS = { + "locations": "/v2/locations", + "parameters": "/v2/parameters", + "measurements": "/v2/measurements", +} + + +def _consume(endpoint, *, params=None, timeout=10, retry=5, limit=500, npages=None): + """Consume a paginated OpenAQ API endpoint. + + Parameters + ---------- + endpoint : str + API endpoint, e.g. ``'/v2/locations'``, ``'/v2/parameters'``, ``'/v2/measurements'``. + params : dict, optional + Parameters for the GET request to the API. + Don't pass ``limit``, ``page``, or ``offset`` here, since they are covered + by the `limit` and `npages` kwargs. + timeout : float or tuple + Seconds to wait for the server before giving up. Passed to ``requests.get``. + retry : int + Number of times to retry the request if it times out. + limit : int + Max number of results per page. + npages : int, optional + Number of pages to fetch. + By default, try to fetch as many as needed to get all results. + """ + import time + from random import random as rand + + if not endpoint.startswith("/"): + endpoint = "/" + endpoint + if not endpoint.startswith("/v2"): + endpoint = "/v2" + endpoint + url = _BASE_URL + endpoint + + if params is None: + params = {} + + if npages is None: + # Maximize + # "limit + offset must be <= 100_000" + # where offset = limit * (page - 1) + # => limit * page <= 100_000 + # and also page must be <= 6_000 + npages = min(100_000 // limit, 6_000) + + params["limit"] = limit + + headers = { + "Accept": "application/json", + "X-API-Key": API_KEY, + "User-Agent": "monetio", + } + + data = [] + for page in range(1, npages + 1): + params["page"] = page + + tries = 0 + while tries < retry: + logger.debug(f"GET {url} params={params}") + r = requests.get(url, params=params, headers=headers, timeout=timeout) + tries += 1 + if r.status_code == 408: + logger.info(f"request timed out (try {tries}/{retry})") + time.sleep(tries + 0.1 * rand()) + elif r.status_code == 429: + # Note: response headers don't seem to include Retry-After + logger.info(f"rate limited (try {tries}/{retry})") + time.sleep(tries * 5 + 0.2 * rand()) + else: + break + r.raise_for_status() + + this_data = r.json() + found = this_data["meta"]["found"] + n = len(this_data["results"]) + logger.info(f"page={page} found={found!r} n={n}") + if n == 0: + break + if n < limit: + logger.info(f"note: results returned ({n}) < limit ({limit})") + data.extend(this_data["results"]) + + if isinstance(found, str) and found.startswith(">"): + print(f"warning: some query results not fetched ('found' is {found!r})") + elif isinstance(found, int) and len(data) < found: + print(f"warning: some query results not fetched (found={found}, got {len(data)} results)") + + return data + + +@_api_key_warning +def get_locations(**kwargs): + """Get available site info (including site IDs) from OpenAQ v2 API. + + kwargs are passed to :func:`_consume`. + + https://api.openaq.org/docs#/v2/locations_get_v2_locations_get + """ + + data = _consume(_ENDPOINTS["locations"], **kwargs) + + # Some fields with scalar values to take + some_scalars = [ + "id", + "name", + "city", + "country", + # "entity", # all null (from /measurements we do get values) + "isMobile", + # "isAnalysis", # all null + # "sensorType", # all null (from /measurements we do get values) + "firstUpdated", + "lastUpdated", + ] + + data2 = [] + for d in data: + lat = d["coordinates"]["latitude"] + lon = d["coordinates"]["longitude"] + parameters = [p["parameter"] for p in d["parameters"]] + mfs = d["manufacturers"] + if mfs: + manufacturer = mfs[0]["manufacturerName"] + if len(mfs) > 1: + logger.info(f"site {d['id']} has multiple manufacturers: {mfs}") + else: + manufacturer = None + d2 = {k: d[k] for k in some_scalars} + d2.update( + latitude=lat, + longitude=lon, + parameters=parameters, + manufacturer=manufacturer, + ) + data2.append(d2) + + df = pd.DataFrame(data2) + + # Compute datetimes (the timestamps are already in UTC, but with tz specified) + assert df.firstUpdated.str.slice(-6, None).eq("+00:00").all() + df["firstUpdated"] = pd.to_datetime(df.firstUpdated.str.slice(0, -6)) + assert df.lastUpdated.str.slice(-6, None).eq("+00:00").all() + df["lastUpdated"] = pd.to_datetime(df.lastUpdated.str.slice(0, -6)) + + # Site ID + df = df.rename(columns={"id": "siteid"}) + df["siteid"] = df.siteid.astype(str) + maybe_dupe_rows = df[df.siteid.duplicated(keep=False)].sort_values("siteid") + if not maybe_dupe_rows.empty: + logger.info( + f"note: found {len(maybe_dupe_rows)} rows with duplicate site IDs:\n{maybe_dupe_rows}" + ) + df = df.drop_duplicates("siteid", keep="first").reset_index(drop=True) # seem to be some dupes + + return df + + +def get_parameters(**kwargs): + """Get supported parameter info from OpenAQ v2 API. + + kwargs are passed to :func:`_consume`. + """ + + data = _consume(_ENDPOINTS["parameters"], **kwargs) + + df = pd.DataFrame(data) + + return df + + +def get_latlonbox_sites(latlonbox, **kwargs): + """From all available sites, return those within a lat/lon box. + + kwargs are passed to :func:`_consume`. + + Parameters + ---------- + latlonbox : array-like of float + ``[lat1, lon1, lat2, lon2]`` (lower-left corner, upper-right corner) + """ + lat1, lon1, lat2, lon2 = latlonbox + sites = get_locations(**kwargs) + + in_box = ( + (sites.latitude >= lat1) + & (sites.latitude <= lat2) + & (sites.longitude >= lon1) + & (sites.longitude <= lon2) + ) + # TODO: need to account for case of box crossing antimeridian + + return sites[in_box].reset_index(drop=True) + + +@_api_key_warning +def add_data( + dates, + *, + parameters=None, + country=None, + search_radius=None, + sites=None, + entity=None, + sensor_type=None, + query_time_split="1H", + wide_fmt=False, # FIXME: probably want to default to True + **kwargs, +): + """Get OpenAQ API v2 data, including low-cost sensors. + + Parameters + ---------- + dates : datetime-like or array-like of datetime-like + One desired date/time or + an array, of which the min and max will be used + as inclusive time bounds of the desired data. + parameters : str or list of str, optional + For example, ``'o3'`` or ``['pm25', 'o3']`` (default). + country : str or list of str, optional + For example, ``'US'`` or ``['US', 'CA']`` (two-letter country codes). + Default: full dataset (no limitation by country). + search_radius : dict, optional + Mapping of coords tuple (lat, lon) [deg] to search radius [m] (max of 25 km). + For example: ``search_radius={(39.0, -77.0): 10_000}``. + Note that this dict can contain multiple entries. + sites : list of str, optional + Site ID(s) to include, e.g. a specific known site + or group of sites from :func:`get_latlonbox_sites`. + Default: full dataset (no limitation by site). + entity : str or list of str, optional + Options: ``'government'``, ``'research'``, ``'community'``. + Default: full dataset (no limitation by entity). + sensor_type : str or list of str, optional + Options: ``'low-cost sensor'``, ``'reference grade'``. + Default: full dataset (no limitation by sensor type). + query_time_split + Frequency to use when splitting the web API queries in time, + in a format that ``pandas.to_timedelta`` will understand. + This is necessary since there is a 100k limit on the number of results. + However, if you are using search radii, e.g., you may want to set this + to something higher in order to increase the query return speed. + Set to ``None`` for no time splitting. + Default: 1 hour + (OpenAQ data are hourly, so setting to something smaller won't help). + Ignored if only one date/time is provided. + wide_fmt : bool + Convert dataframe to wide format (one column per parameter). + Note that for some variables that involves conversion from + µg/m³ to ppmv. + This conversion is based on an average air molecular weight of 29 g/mol + and an air density of 1.2 kg/m³. + Use ``wide_fmt=False`` if you want to do the conversion yourself. + In some cases, the conversion to wide format also reduces the amount of data returned. + retry : int, default: 5 + Number of times to retry an API request if it times out. + timeout : float, default: 10 + Seconds to wait for the server before giving up, for a single request. + threads : int, optional + Number of threads to use for fetching data. + Default: no multi-threading. + """ + + dates = pd.to_datetime(dates) + if pd.api.types.is_scalar(dates): + dates = pd.DatetimeIndex([dates]) + dates = dates.dropna() + if dates.empty: + raise ValueError("must provide at least one datetime-like") + + if parameters is None: + parameters = ["pm25", "o3"] + elif isinstance(parameters, str): + parameters = [parameters] + + query_dt = pd.to_timedelta(query_time_split) if len(dates) > 1 else None + date_min, date_max = dates.min(), dates.max() + if query_dt is not None: + if query_dt <= pd.Timedelta(0): + raise ValueError( + f"query_time_split must be positive, got {query_dt} from {query_time_split!r}" + ) + if date_min == date_max: + raise ValueError( + "must provide at least two unique datetimes to use query_time_split. " + "Set query_time_split=None to disable time splitting." + ) + + if search_radius is not None: + for coords, radius in search_radius.items(): + if not 0 < radius <= 25_000: + raise ValueError( + f"invalid radius {radius!r} for location {coords!r}. " + "Must be positive and <= 25000 (25 km)." + ) + + def iter_time_slices(): + # seems that (from < time <= to) == (from , to] is used + # i.e. `from` is exclusive, `to` is inclusive + one_sec = pd.Timedelta(seconds=1) + if query_dt is not None: + t = date_min + while t < date_max: + t_next = min(t + query_dt, date_max) + yield t - one_sec, t_next + t = t_next + else: + yield date_min - one_sec, date_max + + base_params = {} + if country is not None: + base_params.update(country=country) + if sites is not None: + base_params.update(location_id=sites) + if entity is not None: + base_params.update(entity=entity) + if sensor_type is not None: + base_params.update(sensor_type=sensor_type) + + def iter_queries(): + for parameter in parameters: + for t_from, t_to in iter_time_slices(): + if search_radius is not None: + for coords, radius in search_radius.items(): + lat, lon = coords + yield { + **base_params, + "parameter": parameter, + "date_from": t_from, + "date_to": t_to, + "coordinates": f"{lat:.8f},{lon:.8f}", + "radius": radius, + } + else: + yield { + **base_params, + "parameter": parameter, + "date_from": t_from, + "date_to": t_to, + } + + threads = kwargs.pop("threads", None) + if threads is not None: + import concurrent.futures + from itertools import chain + + with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: + data = chain.from_iterable( + executor.map( + lambda params: _consume(_ENDPOINTS["measurements"], params=params, **kwargs), + iter_queries(), + ) + ) + else: + data = [] + for params in iter_queries(): + this_data = _consume( + _ENDPOINTS["measurements"], + params=params, + **kwargs, + ) + data.extend(this_data) + + df = pd.DataFrame(data) + if df.empty: + print("warning: no data found") + return df + + # # Column Non-Null Count Dtype + # --- ------ -------------- ----- + # 0 locationId 2000 non-null int64 + # 1 location 2000 non-null object + # 2 parameter 2000 non-null object + # 3 value 2000 non-null float64 + # 4 date 2000 non-null object + # 5 unit 2000 non-null object + # 6 coordinates 2000 non-null object + # 7 country 2000 non-null object + # 8 city 0 non-null object # None + # 9 isMobile 2000 non-null bool + # 10 isAnalysis 0 non-null object # None + # 11 entity 2000 non-null object + # 12 sensorType 2000 non-null object + + to_expand = ["date", "coordinates"] + new = pd.json_normalize(json.loads(df[to_expand].to_json(orient="records"))) + + time = pd.to_datetime(new["date.utc"]).dt.tz_localize(None) + # utcoffset = pd.to_timedelta(new["date.local"].str.slice(-6, None) + ":00") + # time_local = time + utcoffset + # ^ Seems some have negative minutes in the tz, so this method complains + time_local = pd.to_datetime(new["date.local"].str.slice(0, 19)) + utcoffset = time_local - time + + # TODO: null case?? + lat = new["coordinates.latitude"] + lon = new["coordinates.longitude"] + + df = df.drop(columns=to_expand).assign( + time=time, + time_local=time_local, + utcoffset=utcoffset, + latitude=lat, + longitude=lon, + ) + + # Rename columns and ensure site ID is string + df = df.rename( + columns={ + "locationId": "siteid", + "isMobile": "is_mobile", + "isAnalysis": "is_analysis", + "sensorType": "sensor_type", + }, + ) + df["siteid"] = df.siteid.astype(str) + + # Most variables invalid if < 0 + # > preferredUnit.value_counts() + # ppb 19 + # µg/m³ 13 + # ppm 10 + # particles/cm³ 8 + # % 3 relative humidity + # umol/mol 1 + # ng/m3 1 + # deg 1 wind direction + # m/s 1 wind speed + # deg_c 1 + # hpa 1 + # ugm3 1 + # c 1 + # f 1 + # mb 1 + # iaq 1 + non_neg_units = [ + "particles/cm³", + "ppm", + "ppb", + "umol/mol", + "µg/m³", + "ugm3", + "ng/m3", + "iaq", + # + "%", + # + "m/s", + # + "hpa", + "mb", + ] + df.loc[df.unit.isin(non_neg_units) & (df.value < 0), "value"] = np.nan + + if wide_fmt: + # Normalize units + for vn, f in _PPM_TO_UGM3.items(): + is_ug = (df.parameter == vn) & (df.unit == "µg/m³") + df.loc[is_ug, "value"] /= f + df.loc[is_ug, "unit"] = "ppm" + + # Warn if inconsistent units + p_units = df.groupby("parameter").unit.unique() + unique = p_units.apply(len).eq(1) + if not unique.all(): + p_units_non_unique = p_units[~unique] + warnings.warn(f"inconsistent units among parameters:\n{p_units_non_unique}") + + # Certain metadata should be unique for a given site but sometimes aren't + # (e.g. location names of different specificity, slight differences in lat/lon coords) + for col in ["location", "latitude", "longitude"]: + site_col = df.groupby("siteid")[col].unique() + unique = site_col.apply(len).eq(1) + if not unique.all(): + site_col_non_unique = site_col[~unique] + warnings.warn( + f"non-unique {col!r} among site IDs:\n{site_col_non_unique}" "\nUsing first." + ) + df = df.drop(columns=[col]).merge( + site_col.str.get(0), + left_on="siteid", + right_index=True, + how="left", + ) + + # Pivot + index = [ + "siteid", + "time", + "latitude", + "longitude", + "time_local", + "utcoffset", + # + "location", + "city", + "country", + # + "entity", + "sensor_type", + "is_mobile", + "is_analysis", + ] + dupes = df[df.duplicated(keep=False)] + if not dupes.empty: + logging.info(f"found {len(dupes)} duplicated rows") + for col in index: + if df[col].isnull().all(): + index.remove(col) + warnings.warn(f"dropping {col!r} from index for wide fmt (all null)") + df = ( + df.drop_duplicates(keep="first") + .pivot_table( + values="value", + index=index, + columns="parameter", + ) + .reset_index() + ) + + # Rename so that units are clear + df = df.rename(columns={p: f"{p}_ugm3" for p in _NON_MOLEC_PARAMS}, errors="ignore") + df = df.rename(columns={p: f"{p}_ppm" for p in _PPM_TO_UGM3}, errors="ignore") + + return df diff --git a/tests/test_openaq.py b/tests/test_openaq.py index f3ec743b..0fb1de80 100644 --- a/tests/test_openaq.py +++ b/tests/test_openaq.py @@ -1,20 +1,119 @@ import sys +from urllib.error import HTTPError import pandas as pd import pytest from monetio import openaq +if sys.version_info < (3, 7): + pytest.skip("requires Python 3.7+", allow_module_level=True) -@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires Python 3.7+") -def test_openaq(): - # First date in the archive, just one file - # Browse the archive at https://openaq-fetches.s3.amazonaws.com/index.html - dates = pd.date_range(start="2013-11-26", end="2013-11-27", freq="H")[:-1] - try: - df = openaq.add_data(dates) - except PermissionError: - pytest.skip("private") +# openaq._URL_CAP_RANDOM_SAMPLE = True +openaq._URL_CAP = 4 + +# First date in the archive, just one file +# Browse the archive at https://openaq-fetches.s3.amazonaws.com/index.html +FIRST_DAY = pd.date_range(start="2013-11-26", end="2013-11-27", freq="H")[:-1] + +permission_error = pytest.mark.xfail(reason="private", raises=PermissionError, strict=True) + +forbidden_error = pytest.mark.xfail(reason="forbidden", raises=HTTPError, strict=True) # 403 + + +@permission_error +def test_openaq_first_date(): + dates = FIRST_DAY + df = openaq.add_data(dates) assert not df.empty assert df.siteid.nunique() == 1 assert (df.country == "CN").all() and ((df.time_local - df.time) == pd.Timedelta(hours=8)).all() + + assert df.latitude.isnull().sum() == 0 + assert df.longitude.isnull().sum() == 0 + + assert df.dtypes["averagingPeriod"] == "timedelta64[ns]" + assert df.averagingPeriod.eq(pd.Timedelta("1H")).all() + + assert df.pm25_ugm3.gt(0).all() + + +@permission_error +def test_openaq_long_fmt(): + dates = FIRST_DAY + df = openaq.add_data(dates, wide_fmt=False) + + assert not df.empty + + assert {"parameter", "value", "unit"} < set(df.columns) + assert "pm25_ugm3" not in df.columns + assert "pm25" in df.parameter.values + + +@forbidden_error +@pytest.mark.parametrize( + "url", + [ + "https://openaq-fetches.s3.amazonaws.com/realtime/2019-08-01/1564644065.ndjson", # 1 MB + "https://openaq-fetches.s3.amazonaws.com/realtime/2023-09-04/1693798742_realtime_1c4e466d-c461-4c8d-b604-1e81cf2df73a.ndjson", # 10 MB" + ], +) +def test_read(url): + df = openaq.read_json(url) + df2 = openaq.read_json2(url) + + assert len(df) > 0 + + if "2019-08-01" in url: + assert len(df2) < len(df), "some that didn't have coords were skipped" + assert df.latitude.isnull().sum() > 0 + else: + assert len(df2) == len(df) + + assert df.dtypes["averagingPeriod"] == "timedelta64[ns]" + assert not df.averagingPeriod.isnull().all() + assert df.averagingPeriod.dropna().gt(pd.Timedelta(0)).all() + + +@permission_error +def test_openaq_2023(): + # Period from Jordan's NRT example (#130) + # There are many files in this period (~ 100?) + # Disable cap setting to test whole set of files + # NOTE: possible to get empty df with the random URL selection + df = openaq.add_data(["2023-09-04", "2023-09-04 23:00"], n_procs=2) + + assert len(df) > 0 + + assert (df.time.astype(str) + df.siteid).nunique() == len(df) + + assert df.dtypes["averagingPeriod"] == "timedelta64[ns]" + assert not df.averagingPeriod.isnull().all() + assert df.averagingPeriod.dropna().gt(pd.Timedelta(0)).all() + + assert df.pm25_ugm3.dropna().gt(0).all() + assert df.o3_ppm.dropna().gt(0).all() + + +def test_parameter_coverage(): + # From https://openaq.org/developers/help/ ("What pollutants are available on OpenAQ?") + # these are the parameters to account for: + params = [ + "pm1", + "pm25", + "pm4", + "pm10", + "bc", + "o3", + "co", + "no2", + "no", + "nox", + "so2", + "ch4", + "co2", + ] + assert len(params) == 13 + assert sorted(openaq.OPENAQ.NON_MOLEC_PARAMS + list(openaq.OPENAQ.PPM_TO_UGM3)) == sorted( + params + ) diff --git a/tests/test_openaq_v2.py b/tests/test_openaq_v2.py new file mode 100644 index 00000000..3e19335f --- /dev/null +++ b/tests/test_openaq_v2.py @@ -0,0 +1,131 @@ +import os + +import pandas as pd +import pytest + +import monetio.obs.openaq_v2 as openaq + +if ( + os.environ.get("CI", "false").lower() not in {"false", "0"} + and os.environ.get("OPENAQ_API_KEY", "") == "" +): + # PRs from forks don't get the secret + pytest.skip("no API key", allow_module_level=True) + +LATLON_NCWCP = 38.9721, -76.9248 +SITES_NEAR_NCWCP = [ + # AirGradient monitor + 1236068, + 1719392, + # # PurpleAir sensors + # 1118827, + # 357301, + # 273440, + # 271155, + # NASA GSFC + 2978434, + # Beltsville (AirNow) + 3832, + 843, +] + + +def test_get_parameters(): + params = openaq.get_parameters() + assert 50 <= len(params) <= 500 + assert params.id.nunique() == len(params) + assert params.name.nunique() < len(params), "dupes for different units etc." + assert "pm25" in params.name.values + assert "o3" in params.name.values + + +def test_get_locations(): + sites = openaq.get_locations(npages=2, limit=100) + assert len(sites) <= 200 + assert sites.siteid.nunique() == len(sites) + assert sites.dtypes["firstUpdated"] == "datetime64[ns]" + assert sites.dtypes["lastUpdated"] == "datetime64[ns]" + assert sites.dtypes["latitude"] == "float64" + assert sites.dtypes["longitude"] == "float64" + assert sites["latitude"].isnull().sum() == 0 + assert sites["longitude"].isnull().sum() == 0 + + +def test_get_data_near_ncwcp_sites(): + sites = SITES_NEAR_NCWCP + dates = pd.date_range("2023-08-01", "2023-08-01 01:00", freq="1H") + df = openaq.add_data(dates, sites=sites) + assert len(df) > 0 + assert "pm25" in df.parameter.values + assert df.latitude.round().eq(39).all() + assert df.longitude.round().eq(-77).all() + assert (sorted(df.time.unique()) == dates).all() + assert set(df.siteid) <= {str(site) for site in sites} + assert not df.value.isna().all() and not df.value.lt(0).any() + + +def test_get_data_near_ncwcp_sites_wide(): + sites = SITES_NEAR_NCWCP + dates = pd.date_range("2023-08-01", "2023-08-01 01:00", freq="1H") + + with pytest.warns(UserWarning, match=r"dropping '.*' from index for wide fmt \(all null\)"): + df = openaq.add_data(dates, sites=sites, wide_fmt=True) + assert len(df) > 0 + assert {"pm25_ugm3", "o3_ppm"} <= set(df.columns) + assert not {"parameter", "value", "unit"} <= set(df.columns) + + +def test_get_data_near_ncwcp_search_radius(): + latlon = LATLON_NCWCP + dates = pd.date_range("2023-08-01", "2023-08-01 01:00", freq="1H") + df = openaq.add_data(dates, search_radius={latlon: 10_000}, threads=2) + assert len(df) > 0 + assert "pm25" in df.parameter.values + assert df.latitude.round().eq(39).all() + assert df.longitude.round().eq(-77).all() + assert (sorted(df.time.unique()) == dates).all() + assert not df.sensor_type.eq("low-cost sensor").all() + assert df.entity.eq("Governmental Organization").all() + + +def test_get_data_near_ncwcp_sensor_type(): + latlon = LATLON_NCWCP + dates = pd.date_range("2023-08-01", "2023-08-01 03:00", freq="1H") + df = openaq.add_data(dates, sensor_type="low-cost sensor", search_radius={latlon: 25_000}) + assert len(df) > 0 + assert df.sensor_type.eq("low-cost sensor").all() + + +def test_get_data_single_dt_single_site(): + site = 843 + dates = "2023-08-01" + df = openaq.add_data(dates, parameters="o3", sites=site) + assert len(df) == 1 + + +@pytest.mark.parametrize( + "entity", + [ + "research", + "community", + ["research", "community"], + ], +) +def test_get_data_near_ncwcp_entity(entity): + latlon = LATLON_NCWCP + dates = pd.date_range("2023-08-01", "2023-08-01 01:00", freq="1H") + df = openaq.add_data(dates, entity=entity, search_radius={latlon: 25_000}) + assert df.empty + + +@pytest.mark.parametrize( + "radius", + [ + 0, + -1, + 25001, + ], +) +def test_get_data_bad_radius(radius): + with pytest.raises(ValueError, match="invalid radius"): + openaq.add_data(["2023-08-01", "2023-08-02"], search_radius={LATLON_NCWCP: radius})