-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
More work on getting Dataflow to run #9
Changes from all commits
71140d0
1ed8bba
6c4f75d
cc99385
d39ce90
66b4088
6816b1a
92102ab
363fb00
3c183f1
e9ae02d
b19e7a3
6152eb6
57d68ed
9e27af0
4f89453
4258766
de123d0
558b7c0
ec2da10
49728c3
4f64075
dc648fd
d3c8464
09ae870
f60775c
da43ea3
d9cf730
04c119b
6c592b7
15b4ad9
f9d03e8
72861b2
023d13a
47ecf4c
8ef929f
4fb61ff
aff5758
84caede
55a8a60
39bfabe
8ce62e1
4cb16c9
4b18288
ee1dbaa
7159713
b7d28e3
23a37a9
4ddfd92
c529398
5c41c75
95211e4
39b7d1f
642af62
1f91f7a
c2c8b80
26baa53
dcf3066
aa09a95
ff79c11
b9d9ea7
99af8c6
53c6683
231d1de
8f3b264
5d32583
1c33507
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,10 +4,12 @@ | |
repo_path = os.environ['GITHUB_REPOSITORY'] | ||
FEEDSTOCK_NAME = repo_path.split('/')[-1] | ||
|
||
c.Bake.prune = 1 | ||
c.Bake.prune = True | ||
c.Bake.bakery_class = "pangeo_forge_runner.bakery.dataflow.DataflowBakery" | ||
c.DataflowBakery.use_dataflow_prime = True | ||
c.DataflowBakery.max_workers = 50 | ||
c.DataflowBakery.use_dataflow_prime = False | ||
c.DataflowBakery.machine_type = "e2-highmem-8" # 1 year had max 50GB of ram on single worker. This is 64GB | ||
c.DataflowBakery.disk_size_gb = 400 | ||
c.DataflowBakery.max_num_workers = 1 | ||
c.DataflowBakery.use_public_ips = True | ||
c.DataflowBakery.service_account_email = ( | ||
"[email protected]" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,9 @@ | ||
# All the information important to cataloging. | ||
"ncviewjs:meta_yaml_url": "https://github.com/leap-stc/proto_feedstock/blob/main/feedstock/meta.yaml" | ||
"ncviewjs:meta_yaml_url": "https://github.com/leap-stc/eNATL_feedstock/blob/main/feedstock/meta.yaml" | ||
tags: | ||
- my-custom-tag | ||
- zarr | ||
- ocean | ||
stores: | ||
- id: "small" | ||
name: "The cool small Proto Dataset" | ||
url: "gs://leap-scratch/data-library/feedstocks/proto_feedstock/small.zarr" | ||
"ncviewjs:rechunking": | ||
- path: "gs://some-bucket/small.zarr" | ||
use_case: "multiscales" | ||
|
||
- id: "large" | ||
name: "The even cooler large Proto Dataset" # no pyramids | ||
url: "gs://leap-scratch/data-library/feedstocks/proto_feedstock/large.zarr" | ||
- id: "enatl60-blbt02" | ||
name: "Needs a name" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @auraoupa Can you help here? This name would show up in the LEAP catalog, see the marked portion here as example |
||
url: "gs://leap-persistent/data-library/feedstocks/eNATL_feedstock/eNATL60-BLBT02.zarr" |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,36 +1,80 @@ | ||||||
import xarray as xr | ||||||
import pandas as pd | ||||||
import apache_beam as beam | ||||||
from pangeo_forge_recipes.patterns import pattern_from_file_sequence | ||||||
import pooch | ||||||
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern | ||||||
from pangeo_forge_recipes.transforms import ( | ||||||
ConsolidateMetadata, | ||||||
ConsolidateDimensionCoordinates, | ||||||
OpenURLWithFSSpec, | ||||||
OpenWithXarray, | ||||||
StoreToZarr, | ||||||
) | ||||||
|
||||||
from leap_data_management_utils.data_management_transforms import ( | ||||||
Copy, | ||||||
get_catalog_store_urls, | ||||||
) | ||||||
|
||||||
catalog_store_urls = get_catalog_store_urls("feedstock/catalog.yaml") | ||||||
|
||||||
|
||||||
dates = pd.date_range("2009-07-01", "2010-06-30", freq="D") | ||||||
|
||||||
records = { | ||||||
1: "10261988", | ||||||
2: "10260907", | ||||||
3: "10260980", | ||||||
4: "10261078", | ||||||
5: "10261126", | ||||||
6: "10261192", | ||||||
7: "10261274", | ||||||
8: "10261349", | ||||||
9: "10261461", | ||||||
10: "10261540", | ||||||
11: "10262356", | ||||||
12: "10261643", | ||||||
} | ||||||
|
||||||
|
||||||
def make_full_path(time): | ||||||
record = str(records[time.month]) | ||||||
date = ( | ||||||
"y" | ||||||
+ str(time.year) | ||||||
+ "m" | ||||||
+ str("{:02d}".format(time.month)) | ||||||
+ "d" | ||||||
+ str("{:02d}".format(time.day)) | ||||||
) | ||||||
return ( | ||||||
f"https://zenodo.org/records/{record}/files/eNATL60-BLBT02_{date}.1d_TSW_60m.nc" | ||||||
) | ||||||
|
||||||
|
||||||
time_concat_dim = ConcatDim("time", dates) | ||||||
pattern = FilePattern(make_full_path, time_concat_dim) | ||||||
# pattern = pattern.prune(60) | ||||||
|
||||||
|
||||||
# Common Parameters | ||||||
days = range(1, 32) | ||||||
dataset_url = "https://zenodo.org/records/10513552/files" | ||||||
class OpenWithPooch(beam.PTransform): | ||||||
@staticmethod | ||||||
def _open_pooch(url: str) -> str: | ||||||
return pooch.retrieve(url=url, known_hash=None) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👀 |
||||||
|
||||||
## Monthly version | ||||||
input_urls = [ | ||||||
f"{dataset_url}/eNATL60-BLBT02_y2009m07d{d:02d}.1d_TSWm_60m.nc" for d in days | ||||||
] | ||||||
pattern = pattern_from_file_sequence(input_urls, concat_dim="time") | ||||||
def expand(self, pcoll: beam.PCollection) -> beam.PCollection: | ||||||
return pcoll | "open" >> beam.MapTuple(lambda k, v: (k, self._open_pooch(v))) | ||||||
|
||||||
|
||||||
class Preprocess(beam.PTransform): | ||||||
"""Custom transform to fix invalid time dimension""" | ||||||
|
||||||
@staticmethod | ||||||
def _set_coords(ds: xr.Dataset) -> xr.Dataset: | ||||||
t_new = xr.DataArray(ds.time_counter.data, dims=["time"]) | ||||||
ds = ds.assign_coords(time=t_new) | ||||||
ds = ds.drop(["time_counter"]) | ||||||
ds = ds.set_coords(["deptht", "depthw", "nav_lon", "nav_lat", "tmask"]) | ||||||
|
||||||
ds = ds.rename({"time_counter": "time"}) | ||||||
ds = ds.set_coords(("nav_lat", "nav_lon")) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where did There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good question, I'll rerun a subset to see what was up. We might have to regen. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I think I remember. I'm pretty sure some of the input netcdf files are missing "t_mask". |
||||||
ds.attrs["deptht"] = ds.deptht.values[0] | ||||||
ds = ds.drop("deptht") | ||||||
ds = ds[["vosaline", "votemper", "vovecrtz"]] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah probably dropped here! |
||||||
return ds | ||||||
|
||||||
def expand(self, pcoll: beam.PCollection) -> beam.PCollection: | ||||||
|
@@ -39,20 +83,21 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: | |||||
) | ||||||
|
||||||
|
||||||
eNATL60_BLBT02 = ( | ||||||
eNATL60BLBT02 = ( | ||||||
beam.Create(pattern.items()) | ||||||
| OpenURLWithFSSpec() | ||||||
| OpenWithXarray( | ||||||
xarray_open_kwargs={"use_cftime": True, "engine": "netcdf4"}, | ||||||
load=True, | ||||||
copy_to_local=True, | ||||||
) | ||||||
# | OpenURLWithFSSpec(max_concurrency=1) | ||||||
| OpenWithPooch() | ||||||
| OpenWithXarray() | ||||||
# xarray_open_kwargs={"use_cftime": True, "engine": "netcdf4"}, | ||||||
# load=True, | ||||||
# copy_to_local=True,) | ||||||
| Preprocess() | ||||||
| StoreToZarr( | ||||||
store_name="eNATL60_BLBT02.zarr", | ||||||
store_name="eNATL60-BLBT02.zarr", | ||||||
combine_dims=pattern.combine_dim_keys, | ||||||
target_chunks={"x": 2000, "y": 2000, "time": 2}, | ||||||
target_chunks={"time": 30, "y": 900, "x": 900}, | ||||||
) | ||||||
| ConsolidateDimensionCoordinates() | ||||||
| ConsolidateMetadata() | ||||||
| Copy(target=catalog_store_urls["enatl60-blbt02"]) | ||||||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
pangeo-forge-recipes==0.10.4 | ||
pangeo-forge-recipes==0.10.7 | ||
gcsfs | ||
apache-beam[gcp] | ||
apache-beam[gcp] >= 2.58.0 | ||
leap-data-management-utils==0.0.12 | ||
xarray=2024.05.0 | ||
xarray==2024.05.0 | ||
pooch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this useful to bring over to the template feedstock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leap-stc/LEAP_template_feedstock#55