Skip to content

Commit

Permalink
Merge pull request #6 from axiom-data-science/update_intake_v2
Browse files Browse the repository at this point in the history
updated to intake v2!
  • Loading branch information
kthyng authored Jul 24, 2024
2 parents c0d034f + 4a839bb commit 17186ed
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 205 deletions.
21 changes: 18 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ Install package locally in package directory

If you input to `intake.open_coops_cat()` the keyword argument `process_adcp=True`, the ADCP Dataset will contain velocity on u and v components, along- and across-channel components, and along- and across-channel subtidal signal (processed with pl33 tidal filter, also included).

```
import intake
```python
import intake_coops

stations = ["COI0302", "COI0512"]
cat = intake.open_coops_cat(stations)
cat = intake_coops.COOPSCatalogReader(stations).read()

# sources in catalog
print(list(cat))
Expand All @@ -65,6 +65,21 @@ print(cat["COI0302"])

# read in data to a Dataset
ds = cat["COI0302"].read()
ds
```

```python
<xarray.Dataset> Size: 3MB
Dimensions: (t: 8399, depth: 13)
Coordinates:
* t (t) datetime64[ns] 67kB 2003-07-16T00:08:00 ... 2003-08-19T23:...
* depth (depth) float64 104B 0.03 1.04 2.04 3.02 ... 10.03 11.03 12.04
longitude float64 8B -149.9
latitude float64 8B 61.27
Data variables:
b (t, depth) float64 873kB 13.0 12.0 11.0 10.0 ... 4.0 3.0 2.0 1.0
d (t, depth) float64 873kB 22.0 44.0 55.0 ... 211.0 211.0 212.0
s (t, depth) float64 873kB 37.1 55.6 45.4 21.3 ... 83.0 79.2 76.0
```


Expand Down
6 changes: 4 additions & 2 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ channels:
dependencies:
# Required for full project functionality (dont remove)
- pytest
- cf_pandas
- intake
- cf-pandas
- cf-xarray
# - intake
- ipython
# - noaa_coops
- numpy
- pandas
- pip
- xarray
- pip:
- git+https://github.com/intake/intake.git
- noaa_coops
25 changes: 4 additions & 21 deletions intake_coops/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,6 @@
"""
intake-axds: Intake approach for Axiom assets.
"""
"""intake-coops package."""

# from .axds_cat import AXDSCatalog
# from .utils import ( # noqa: F401
# _get_version,
# available_names,
# match_key_to_parameter,
# return_parameter_options,
# )
import intake # noqa: F401


# __version__ = _get_version()

from importlib.metadata import PackageNotFoundError, version


try:
__version__ = version("intake-coops")
except PackageNotFoundError:
# package is not installed
__version__ = "unknown"
from .coops import COOPSDataframeReader, COOPSXarrayReader
from .coops_cat import COOPSCatalogReader
221 changes: 86 additions & 135 deletions intake_coops/coops.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
"""
Source for CO-OPS data.
Reader for CO-OPS data.
"""

from typing import Optional
import cf_pandas # noqa: F401
import cf_xarray # noqa: F401
import noaa_coops as nc
import numpy as np
import pandas as pd
import xarray as xr

from intake.source import base
from intake.readers.readers import BaseReader

from . import __version__


class COOPSDataframeSource(base.DataSource):
class COOPSDataframeReader(BaseReader):
"""
Parameters
----------
Expand All @@ -25,88 +25,59 @@ class COOPSDataframeSource(base.DataSource):
Dataframe
"""

name = "coops-dataframe"
version = __version__
container = "dataframe"
partition_access = True

def __init__(self, stationid: str, metadata={}):

self._dataframe = None
self.stationid = stationid
# self.metadata = metadata
self.s = nc.Station(self.stationid)

super(COOPSDataframeSource, self).__init__(metadata=metadata)

def _get_schema(self) -> base.Schema:
if self._dataframe is None:
# TODO: could do partial read with chunksize to get likely schema from
# first few records, rather than loading the whole thing
self._load()
self._dataset_metadata = self._get_dataset_metadata()
# make type checker happy
assert self._dataframe is not None
return base.Schema(
datashape=None,
dtype=self._dataframe.dtypes,
shape=self._dataframe.shape,
npartitions=1,
extra_metadata=self._dataset_metadata,
)

def _get_partition(self) -> pd.DataFrame:
if self._dataframe is None:
self._load_metadata()
return self._dataframe
output_instance = "pandas:DataFrame"

def read(self) -> pd.DataFrame:
"""Return the dataframe from ERDDAP"""
return self._get_partition()

def _load(self):
def _read(self, stationid: str):
"""How to load in a specific station once you know it by dataset_id"""

# s = nc.Station(stationid)
s = self._return_station_object(stationid)

begin_date = pd.Timestamp(self.s.deployed).strftime("%Y%m%d")
end_date = pd.Timestamp(self.s.retrieved).strftime("%Y%m%d")
begin_date = pd.Timestamp(s.deployed).strftime("%Y%m%d")
end_date = pd.Timestamp(s.retrieved).strftime("%Y%m%d")

dfs = []
for bin in self.s.bins["bins"]:
for bin in s.bins["bins"]:
depth = bin["depth"]
num = bin["num"]
df = self.s.get_data(
dft = s.get_data(
begin_date=begin_date,
end_date=end_date,
product="currents",
bin_num=num,
)
df["depth"] = depth
dfs.append(df)
self._dataframe = pd.concat(dfs)

def _get_dataset_metadata(self):
dft["depth"] = depth
dfs.append(dft)
return pd.concat(dfs)

def _return_station_object(self, stationid: str):
"""Return station object."""
return nc.Station(stationid)

def _get_dataset_metadata(self, stationid: str):
"""Load metadata once data is loaded."""
# self._load()
# metadata = {}
metadata = self.s.deployments
metadata.update(self.s.lat_lon)
metadata.update(
{
"name": self.s.name,
"observe_dst": self.s.observe_dst,
"project": self.s.project,
"project_type": self.s.project_type,
"timezone_offset": self.s.timezone_offset,
"units": self.s.units,
}
)
return metadata

def _close(self):
self._dataframe = None


class COOPSXarraySource(COOPSDataframeSource):
s = self._return_station_object(stationid)
# metadata = s.deployments
# metadata.update(s.lat_lon)
# metadata.update(
# {
# "name": s.name,
# "observe_dst": s.observe_dst,
# "project": s.project,
# "project_type": s.project_type,
# "timezone_offset": s.timezone_offset,
# "units": s.units,
# }
# )
# import pdb; pdb.set_trace()
# if moremetadata:
# metadata.update(s.metadata)
return s.metadata


class COOPSXarrayReader(COOPSDataframeReader):
"""Converts returned DataFrame into Dataset
which for ADCP data is more appropriate.
Expand All @@ -116,114 +87,94 @@ class COOPSXarraySource(COOPSDataframeSource):
Dataset
"""

name = "coops-xarray"
version = __version__
container = "xarray"
partition_access = True
output_instance = "xarray:Dataset"

def __init__(self, stationid, process_adcp: bool = False, metadata={}):
"""Initialize."""

self._ds = None
# self.stationid = stationid
# self.metadata = metadata
self._process_adcp = process_adcp

self.source = COOPSDataframeSource(stationid, metadata)

# self.s = nc.Station(self.stationid)
def _read(self, stationid, process_adcp: bool = False):#, metadata_in: Optional[dict] = None):
"""Read as DataFrame but convert to Dataset."""
# metadata_in = metadata_in or {}

super(COOPSXarraySource, self).__init__(stationid=stationid, metadata=metadata)
reader = COOPSDataframeReader(stationid)

def _load(self):
"""Read as DataFrame but convert to Dataset."""
df = self.source.read()
df = reader.read()

inds = [df.cf["T"].name, df.cf["Z"].name]
self._ds = (

ds = (
df.reset_index()
.set_index(inds)
.sort_index()
.pivot_table(index=inds)
.to_xarray()
)
self._ds["t"].attrs = {"standard_name": "time"}
self._ds["depth"].attrs = {
ds["t"].attrs = {"standard_name": "time"}
ds["depth"].attrs = {
"standard_name": "depth",
"axis": "Z",
}
self._ds["longitude"] = self.metadata["lon"]
self._ds["longitude"].attrs = {"standard_name": "longitude"}
self._ds["latitude"] = self.metadata["lat"]
self._ds["latitude"].attrs = {"standard_name": "latitude"}
self._ds = self._ds.assign_coords(
{"longitude": self._ds["longitude"], "latitude": self._ds["latitude"]}
metadata = self._get_dataset_metadata(stationid)

ds["longitude"] = metadata["lng"]
ds["longitude"].attrs = {"standard_name": "longitude"}
ds["latitude"] = metadata["lat"]
ds["latitude"].attrs = {"standard_name": "latitude"}
ds = ds.assign_coords(
{"longitude": ds["longitude"], "latitude": ds["latitude"]}
)
if self._process_adcp:
self.process_adcp()

def process_adcp(self):
if process_adcp:
ds = self.process_adcp(metadata, ds)

return ds

def process_adcp(self, metadata, ds):
"""Process ADCP data.
Returns
-------
Dataset
With u and v, ualong and vacross, and subtidal versions ualong_subtidal, vacross_subtidal
"""
theta = self.source.metadata["flood_direction_degrees"]
self._ds["u"] = (
np.cos(np.deg2rad(self._ds.cf["dir"])) * self._ds.cf["speed"] / 100
theta = metadata["deployments"]["flood_direction_degrees"]
ds["u"] = (
np.cos(np.deg2rad(ds.cf["dir"])) * ds.cf["speed"] / 100
)
self._ds["v"] = (
np.sin(np.deg2rad(self._ds.cf["dir"])) * self._ds.cf["speed"] / 100
ds["v"] = (
np.sin(np.deg2rad(ds.cf["dir"])) * ds.cf["speed"] / 100
)
self._ds["ualong"] = self._ds["u"] * np.cos(np.deg2rad(theta)) + self._ds[
ds["ualong"] = ds["u"] * np.cos(np.deg2rad(theta)) + ds[
"v"
] * np.sin(np.deg2rad(theta))
self._ds["vacross"] = -self._ds["u"] * np.sin(np.deg2rad(theta)) + self._ds[
ds["vacross"] = -ds["u"] * np.sin(np.deg2rad(theta)) + ds[
"v"
] * np.cos(np.deg2rad(theta))
self._ds["s"] /= 100
self._ds["s"].attrs = {"standard_name": "sea_water_speed", "units": "m s-1"}
self._ds["d"].attrs = {
ds["s"] /= 100
ds["s"].attrs = {"standard_name": "sea_water_speed", "units": "m s-1"}
ds["d"].attrs = {
"standard_name": "sea_water_velocity_to_direction",
"units": "degree",
}
self._ds["u"].attrs = {
ds["u"].attrs = {
"standard_name": "eastward_sea_water_velocity",
"units": "m s-1",
}
self._ds["v"].attrs = {
ds["v"].attrs = {
"standard_name": "northward_sea_water_velocity",
"units": "m s-1",
}
self._ds["ualong"].attrs = {
ds["ualong"].attrs = {
"Long name": "Along channel velocity",
"units": "m s-1",
}
self._ds["vacross"].attrs = {
ds["vacross"].attrs = {
"Long name": "Across channel velocity",
"units": "m s-1",
}

# calculate subtidal velocities
self._ds["ualong_subtidal"] = tidal_filter(self._ds["ualong"])
self._ds["vacross_subtidal"] = tidal_filter(self._ds["vacross"])

def to_dask(self):
"""Read data."""
self.read()
return self._ds

def read(self):
"""Read data."""
if self._ds is None:
# self._load_metadata()
self._load()
return self._ds

def _close(self):
self._ds = None
self._schema = None
ds["ualong_subtidal"] = tidal_filter(ds["ualong"])
ds["vacross_subtidal"] = tidal_filter(ds["vacross"])

return ds


class plfilt(object):
Expand Down
Loading

0 comments on commit 17186ed

Please sign in to comment.