From 4a839bb6c21d250186fcffe5b48a82efc87c3dd1 Mon Sep 17 00:00:00 2001 From: Kristen Thyng Date: Wed, 24 Jul 2024 11:59:14 -0500 Subject: [PATCH] updated to intake v2! --- README.md | 21 +++- environment.yml | 6 +- intake_coops/__init__.py | 25 +---- intake_coops/coops.py | 221 +++++++++++++++----------------------- intake_coops/coops_cat.py | 71 ++++++------ setup.py | 6 +- 6 files changed, 145 insertions(+), 205 deletions(-) diff --git a/README.md b/README.md index 7eb9eff..ad0a459 100644 --- a/README.md +++ b/README.md @@ -51,11 +51,11 @@ Install package locally in package directory If you input to `intake.open_coops_cat()` the keyword argument `process_adcp=True`, the ADCP Dataset will contain velocity on u and v components, along- and across-channel components, and along- and across-channel subtidal signal (processed with pl33 tidal filter, also included). -``` -import intake +```python +import intake_coops stations = ["COI0302", "COI0512"] -cat = intake.open_coops_cat(stations) +cat = intake_coops.COOPSCatalogReader(stations).read() # sources in catalog print(list(cat)) @@ -65,6 +65,21 @@ print(cat["COI0302"]) # read in data to a Dataset ds = cat["COI0302"].read() +ds +``` + +```python + Size: 3MB +Dimensions: (t: 8399, depth: 13) +Coordinates: + * t (t) datetime64[ns] 67kB 2003-07-16T00:08:00 ... 2003-08-19T23:... + * depth (depth) float64 104B 0.03 1.04 2.04 3.02 ... 10.03 11.03 12.04 + longitude float64 8B -149.9 + latitude float64 8B 61.27 +Data variables: + b (t, depth) float64 873kB 13.0 12.0 11.0 10.0 ... 4.0 3.0 2.0 1.0 + d (t, depth) float64 873kB 22.0 44.0 55.0 ... 211.0 211.0 212.0 + s (t, depth) float64 873kB 37.1 55.6 45.4 21.3 ... 83.0 79.2 76.0 ``` diff --git a/environment.yml b/environment.yml index 3a7943b..00fbdb4 100644 --- a/environment.yml +++ b/environment.yml @@ -4,8 +4,9 @@ channels: dependencies: # Required for full project functionality (dont remove) - pytest - - cf_pandas - - intake + - cf-pandas + - cf-xarray + # - intake - ipython # - noaa_coops - numpy @@ -13,4 +14,5 @@ dependencies: - pip - xarray - pip: + - git+https://github.com/intake/intake.git - noaa_coops diff --git a/intake_coops/__init__.py b/intake_coops/__init__.py index 2064481..a103472 100644 --- a/intake_coops/__init__.py +++ b/intake_coops/__init__.py @@ -1,23 +1,6 @@ -""" -intake-axds: Intake approach for Axiom assets. -""" +"""intake-coops package.""" -# from .axds_cat import AXDSCatalog -# from .utils import ( # noqa: F401 -# _get_version, -# available_names, -# match_key_to_parameter, -# return_parameter_options, -# ) +import intake # noqa: F401 - -# __version__ = _get_version() - -from importlib.metadata import PackageNotFoundError, version - - -try: - __version__ = version("intake-coops") -except PackageNotFoundError: - # package is not installed - __version__ = "unknown" +from .coops import COOPSDataframeReader, COOPSXarrayReader +from .coops_cat import COOPSCatalogReader diff --git a/intake_coops/coops.py b/intake_coops/coops.py index f2d16b1..3b51fb1 100644 --- a/intake_coops/coops.py +++ b/intake_coops/coops.py @@ -1,19 +1,19 @@ """ -Source for CO-OPS data. +Reader for CO-OPS data. """ +from typing import Optional import cf_pandas # noqa: F401 +import cf_xarray # noqa: F401 import noaa_coops as nc import numpy as np import pandas as pd import xarray as xr -from intake.source import base +from intake.readers.readers import BaseReader -from . import __version__ - -class COOPSDataframeSource(base.DataSource): +class COOPSDataframeReader(BaseReader): """ Parameters ---------- @@ -25,88 +25,59 @@ class COOPSDataframeSource(base.DataSource): Dataframe """ - name = "coops-dataframe" - version = __version__ - container = "dataframe" - partition_access = True - - def __init__(self, stationid: str, metadata={}): - - self._dataframe = None - self.stationid = stationid - # self.metadata = metadata - self.s = nc.Station(self.stationid) - - super(COOPSDataframeSource, self).__init__(metadata=metadata) - - def _get_schema(self) -> base.Schema: - if self._dataframe is None: - # TODO: could do partial read with chunksize to get likely schema from - # first few records, rather than loading the whole thing - self._load() - self._dataset_metadata = self._get_dataset_metadata() - # make type checker happy - assert self._dataframe is not None - return base.Schema( - datashape=None, - dtype=self._dataframe.dtypes, - shape=self._dataframe.shape, - npartitions=1, - extra_metadata=self._dataset_metadata, - ) - - def _get_partition(self) -> pd.DataFrame: - if self._dataframe is None: - self._load_metadata() - return self._dataframe + output_instance = "pandas:DataFrame" - def read(self) -> pd.DataFrame: - """Return the dataframe from ERDDAP""" - return self._get_partition() - - def _load(self): + def _read(self, stationid: str): """How to load in a specific station once you know it by dataset_id""" + + # s = nc.Station(stationid) + s = self._return_station_object(stationid) - begin_date = pd.Timestamp(self.s.deployed).strftime("%Y%m%d") - end_date = pd.Timestamp(self.s.retrieved).strftime("%Y%m%d") + begin_date = pd.Timestamp(s.deployed).strftime("%Y%m%d") + end_date = pd.Timestamp(s.retrieved).strftime("%Y%m%d") dfs = [] - for bin in self.s.bins["bins"]: + for bin in s.bins["bins"]: depth = bin["depth"] num = bin["num"] - df = self.s.get_data( + dft = s.get_data( begin_date=begin_date, end_date=end_date, product="currents", bin_num=num, ) - df["depth"] = depth - dfs.append(df) - self._dataframe = pd.concat(dfs) - - def _get_dataset_metadata(self): + dft["depth"] = depth + dfs.append(dft) + return pd.concat(dfs) + + def _return_station_object(self, stationid: str): + """Return station object.""" + return nc.Station(stationid) + + def _get_dataset_metadata(self, stationid: str): """Load metadata once data is loaded.""" # self._load() # metadata = {} - metadata = self.s.deployments - metadata.update(self.s.lat_lon) - metadata.update( - { - "name": self.s.name, - "observe_dst": self.s.observe_dst, - "project": self.s.project, - "project_type": self.s.project_type, - "timezone_offset": self.s.timezone_offset, - "units": self.s.units, - } - ) - return metadata - - def _close(self): - self._dataframe = None - - -class COOPSXarraySource(COOPSDataframeSource): + s = self._return_station_object(stationid) + # metadata = s.deployments + # metadata.update(s.lat_lon) + # metadata.update( + # { + # "name": s.name, + # "observe_dst": s.observe_dst, + # "project": s.project, + # "project_type": s.project_type, + # "timezone_offset": s.timezone_offset, + # "units": s.units, + # } + # ) + # import pdb; pdb.set_trace() + # if moremetadata: + # metadata.update(s.metadata) + return s.metadata + + +class COOPSXarrayReader(COOPSDataframeReader): """Converts returned DataFrame into Dataset which for ADCP data is more appropriate. @@ -116,52 +87,46 @@ class COOPSXarraySource(COOPSDataframeSource): Dataset """ - name = "coops-xarray" - version = __version__ - container = "xarray" - partition_access = True + output_instance = "xarray:Dataset" - def __init__(self, stationid, process_adcp: bool = False, metadata={}): - """Initialize.""" - - self._ds = None - # self.stationid = stationid - # self.metadata = metadata - self._process_adcp = process_adcp - - self.source = COOPSDataframeSource(stationid, metadata) - - # self.s = nc.Station(self.stationid) + def _read(self, stationid, process_adcp: bool = False):#, metadata_in: Optional[dict] = None): + """Read as DataFrame but convert to Dataset.""" + # metadata_in = metadata_in or {} - super(COOPSXarraySource, self).__init__(stationid=stationid, metadata=metadata) + reader = COOPSDataframeReader(stationid) - def _load(self): - """Read as DataFrame but convert to Dataset.""" - df = self.source.read() + df = reader.read() + inds = [df.cf["T"].name, df.cf["Z"].name] - self._ds = ( + + ds = ( df.reset_index() .set_index(inds) .sort_index() .pivot_table(index=inds) .to_xarray() ) - self._ds["t"].attrs = {"standard_name": "time"} - self._ds["depth"].attrs = { + ds["t"].attrs = {"standard_name": "time"} + ds["depth"].attrs = { "standard_name": "depth", "axis": "Z", } - self._ds["longitude"] = self.metadata["lon"] - self._ds["longitude"].attrs = {"standard_name": "longitude"} - self._ds["latitude"] = self.metadata["lat"] - self._ds["latitude"].attrs = {"standard_name": "latitude"} - self._ds = self._ds.assign_coords( - {"longitude": self._ds["longitude"], "latitude": self._ds["latitude"]} + metadata = self._get_dataset_metadata(stationid) + + ds["longitude"] = metadata["lng"] + ds["longitude"].attrs = {"standard_name": "longitude"} + ds["latitude"] = metadata["lat"] + ds["latitude"].attrs = {"standard_name": "latitude"} + ds = ds.assign_coords( + {"longitude": ds["longitude"], "latitude": ds["latitude"]} ) - if self._process_adcp: - self.process_adcp() - def process_adcp(self): + if process_adcp: + ds = self.process_adcp(metadata, ds) + + return ds + + def process_adcp(self, metadata, ds): """Process ADCP data. Returns @@ -169,61 +134,47 @@ def process_adcp(self): Dataset With u and v, ualong and vacross, and subtidal versions ualong_subtidal, vacross_subtidal """ - theta = self.source.metadata["flood_direction_degrees"] - self._ds["u"] = ( - np.cos(np.deg2rad(self._ds.cf["dir"])) * self._ds.cf["speed"] / 100 + theta = metadata["deployments"]["flood_direction_degrees"] + ds["u"] = ( + np.cos(np.deg2rad(ds.cf["dir"])) * ds.cf["speed"] / 100 ) - self._ds["v"] = ( - np.sin(np.deg2rad(self._ds.cf["dir"])) * self._ds.cf["speed"] / 100 + ds["v"] = ( + np.sin(np.deg2rad(ds.cf["dir"])) * ds.cf["speed"] / 100 ) - self._ds["ualong"] = self._ds["u"] * np.cos(np.deg2rad(theta)) + self._ds[ + ds["ualong"] = ds["u"] * np.cos(np.deg2rad(theta)) + ds[ "v" ] * np.sin(np.deg2rad(theta)) - self._ds["vacross"] = -self._ds["u"] * np.sin(np.deg2rad(theta)) + self._ds[ + ds["vacross"] = -ds["u"] * np.sin(np.deg2rad(theta)) + ds[ "v" ] * np.cos(np.deg2rad(theta)) - self._ds["s"] /= 100 - self._ds["s"].attrs = {"standard_name": "sea_water_speed", "units": "m s-1"} - self._ds["d"].attrs = { + ds["s"] /= 100 + ds["s"].attrs = {"standard_name": "sea_water_speed", "units": "m s-1"} + ds["d"].attrs = { "standard_name": "sea_water_velocity_to_direction", "units": "degree", } - self._ds["u"].attrs = { + ds["u"].attrs = { "standard_name": "eastward_sea_water_velocity", "units": "m s-1", } - self._ds["v"].attrs = { + ds["v"].attrs = { "standard_name": "northward_sea_water_velocity", "units": "m s-1", } - self._ds["ualong"].attrs = { + ds["ualong"].attrs = { "Long name": "Along channel velocity", "units": "m s-1", } - self._ds["vacross"].attrs = { + ds["vacross"].attrs = { "Long name": "Across channel velocity", "units": "m s-1", } # calculate subtidal velocities - self._ds["ualong_subtidal"] = tidal_filter(self._ds["ualong"]) - self._ds["vacross_subtidal"] = tidal_filter(self._ds["vacross"]) - - def to_dask(self): - """Read data.""" - self.read() - return self._ds - - def read(self): - """Read data.""" - if self._ds is None: - # self._load_metadata() - self._load() - return self._ds - - def _close(self): - self._ds = None - self._schema = None + ds["ualong_subtidal"] = tidal_filter(ds["ualong"]) + ds["vacross_subtidal"] = tidal_filter(ds["vacross"]) + + return ds class plfilt(object): diff --git a/intake_coops/coops_cat.py b/intake_coops/coops_cat.py index f99d905..1e28148 100644 --- a/intake_coops/coops_cat.py +++ b/intake_coops/coops_cat.py @@ -3,22 +3,21 @@ """ -from intake.catalog.base import Catalog -from intake.catalog.local import LocalCatalogEntry +from intake.readers.readers import BaseReader +from intake.readers.entry import Catalog, DataDescription -from . import __version__ -from .coops import COOPSDataframeSource, COOPSXarraySource +from .coops import COOPSDataframeReader, COOPSXarrayReader -class COOPSCatalog(Catalog): +class COOPSCatalogReader(BaseReader): """ - Makes data sources out of all datasets for a given AXDS data type. + Makes data readers out of all datasets for a given AXDS data type. Have this cover all data types for now, then split out. """ name = "coops_cat" - version = __version__ + output_instance = "intake.readers.entry:Catalog" def __init__( self, @@ -28,8 +27,8 @@ def __init__( name: str = "catalog", description: str = "Catalog of NOAA CO-OPS assets.", metadata: dict = None, - include_source_metadata: bool = True, - ttl: int = 86400, + include_reader_metadata: bool = True, + # ttl: int = 86400, **kwargs, ): """Initialize a NOAA CO-OPS Catalog. @@ -53,56 +52,46 @@ def __init__( """ self.station_list = station_list - self.include_source_metadata = include_source_metadata + self.include_reader_metadata = include_reader_metadata self._process_adcp = process_adcp # Put together catalog-level stuff metadata = metadata or {} # metadata["station_list"] = self.station_list - - super(COOPSCatalog, self).__init__( - **kwargs, ttl=ttl, name=name, description=description, metadata=metadata + + super(COOPSCatalogReader, self).__init__( + metadata=metadata, ) - - def _load(self): + # self.name = name + # self.metadata = metadata + + def read(self): """Find all dataset ids and create catalog.""" - self._entries = {} - - for station_id in self.station_list: - - # if self.verbose: - # print(f"Dataset ID: {dataset_id}") - - # description = f"AXDS dataset_id {dataset_id} of datatype {self.datatype}" + plugin = "intake_coops.coops:COOPSXarrayReader" - plugin = COOPSXarraySource + entries, aliases = {}, {} + for station_id in self.station_list: args = { "stationid": station_id, "process_adcp": self._process_adcp, } - if self.include_source_metadata: - metadata = COOPSDataframeSource(station_id)._get_dataset_metadata() + if self.include_reader_metadata: + metadata = COOPSDataframeReader(station_id)._get_dataset_metadata(station_id) else: metadata = {} - entry = LocalCatalogEntry( - name=station_id, - description="", # description, - driver=plugin, - direct_access="allow", - args=args, + entries[station_id] = DataDescription( + plugin, + kwargs={**args}, metadata=metadata, - # True, - # args, - # {}, - # {}, - # {}, - # "", - # getenv=False, - # getshell=False, ) + aliases[station_id] = station_id - self._entries[station_id] = entry + cat = Catalog( + data=entries, + aliases=aliases, + ) + return cat diff --git a/setup.py b/setup.py index cf8310c..c3e0969 100644 --- a/setup.py +++ b/setup.py @@ -9,9 +9,9 @@ }, entry_points={ "intake.drivers": [ - "coops-dataframe = intake_coops.coops:COOPSDataframeSource", - "coops-xarray = intake_coops.coops:COOPSXarraySource", - "coops_cat = intake_coops.coops_cat:COOPSCatalog", + "coops-dataframe = intake_coops.coops:COOPSDataframeReader", + "coops-xarray = intake_coops.coops:COOPSXarrayReader", + "coops_cat = intake_coops.coops_cat:COOPSCatalogReader", ] }, )