Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More work on getting Dataflow to run #9

Merged
merged 67 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from 66 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
71140d0
Fix recipe_id naming
jbusecke Jul 12, 2024
1ed8bba
Update catalog.yaml
jbusecke Jul 12, 2024
6c4f75d
Update catalog.yaml
jbusecke Jul 12, 2024
cc99385
Update meta.yaml
jbusecke Jul 12, 2024
d39ce90
fix dependencies
jbusecke Jul 12, 2024
66b4088
bump pgf-recipes
jbusecke Jul 12, 2024
6816b1a
Update config_dataflow.py
jbusecke Jul 12, 2024
92102ab
move final output to persistent bucket
jbusecke Jul 12, 2024
363fb00
Re add logic for copy stage
jbusecke Jul 12, 2024
3c183f1
Update eNATL60.py
jbusecke Jul 12, 2024
e9ae02d
remove prune and reduce machine size
jbusecke Jul 12, 2024
b19e7a3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 12, 2024
6152eb6
Update eNATL60.py
jbusecke Jul 12, 2024
57d68ed
Update eNATL60.py
jbusecke Jul 12, 2024
9e27af0
Update eNATL60.py
jbusecke Jul 12, 2024
4f89453
Update config_dataflow.py
jbusecke Jul 22, 2024
4258766
Update eNATL60.py
jbusecke Jul 23, 2024
de123d0
Update eNATL60.py
jbusecke Jul 23, 2024
558b7c0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 23, 2024
ec2da10
Update eNATL60.py
jbusecke Jul 23, 2024
49728c3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 23, 2024
4f64075
Update eNATL60.py
jbusecke Jul 23, 2024
dc648fd
converts cftime.DatetimeGregorian datetime64[ns]
norlandrhagen Aug 8, 2024
d3c8464
pruned + job name
norlandrhagen Aug 8, 2024
09ae870
adds copy stage & echo
norlandrhagen Aug 8, 2024
f60775c
noprune
norlandrhagen Aug 8, 2024
da43ea3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 8, 2024
d9cf730
hardcode dataflow name + prune
norlandrhagen Aug 9, 2024
04c119b
Merge branch 'recipe-id-wo-underscore' of https://github.com/leap-stc…
norlandrhagen Aug 9, 2024
6c592b7
1 month + prod copy
norlandrhagen Aug 9, 2024
15b4ad9
hardcode naming
norlandrhagen Aug 9, 2024
f9d03e8
naming nit
norlandrhagen Aug 9, 2024
72861b2
max_concurrency=1 & prune off
norlandrhagen Aug 13, 2024
023d13a
prune bool - does this work
norlandrhagen Aug 13, 2024
47ecf4c
disable prime, add e2-highmem-4
norlandrhagen Aug 13, 2024
8ef929f
bump machine size
norlandrhagen Aug 13, 2024
4fb61ff
new approach
norlandrhagen Aug 13, 2024
aff5758
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 13, 2024
84caede
lint
norlandrhagen Aug 13, 2024
55a8a60
Merge branch 'recipe-id-wo-underscore' of https://github.com/leap-stc…
norlandrhagen Aug 13, 2024
39bfabe
pooch
norlandrhagen Aug 13, 2024
8ce62e1
pooch 5 timesteps
norlandrhagen Aug 13, 2024
4cb16c9
update reqs
norlandrhagen Aug 13, 2024
4b18288
5 to 32 days
norlandrhagen Aug 13, 2024
ee1dbaa
smaller worker
norlandrhagen Aug 14, 2024
7159713
trying out ARM machine
norlandrhagen Aug 14, 2024
b7d28e3
small standard worker
norlandrhagen Aug 14, 2024
23a37a9
single worker + highmem
norlandrhagen Aug 14, 2024
4ddfd92
task
norlandrhagen Aug 14, 2024
c529398
disk size
norlandrhagen Aug 14, 2024
5c41c75
300GB storage
norlandrhagen Aug 14, 2024
95211e4
120 days
norlandrhagen Aug 14, 2024
39b7d1f
weird z3 machine
norlandrhagen Aug 14, 2024
642af62
region for z3
norlandrhagen Aug 14, 2024
1f91f7a
use_shuffle = False + fork of runner with new opts
norlandrhagen Aug 14, 2024
c2c8b80
nit
norlandrhagen Aug 14, 2024
26baa53
max_num_workers=1
norlandrhagen Aug 14, 2024
dcf3066
tmask
norlandrhagen Aug 14, 2024
aa09a95
shuffle off retry + 34 days
norlandrhagen Aug 14, 2024
ff79c11
nit
norlandrhagen Aug 14, 2024
b9d9ea7
pruned version, no copy w/ shuffle and ssd
norlandrhagen Aug 14, 2024
99af8c6
eNATL full recipe + shuffle no copy
norlandrhagen Aug 14, 2024
53c6683
pooch concurrent test
norlandrhagen Aug 14, 2024
231d1de
coord update
norlandrhagen Aug 15, 2024
8f3b264
1 worker
norlandrhagen Aug 15, 2024
5d32583
config update
norlandrhagen Aug 15, 2024
1c33507
incorporate review + smaller worker + prune
norlandrhagen Aug 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/deploy_recipe.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- name: "Install dependencies"
run: |
python -m pip install --upgrade pip
pip install pangeo-forge-runner
pip install git+https://github.com/leap-stc/pangeo-forge-runner
- name: "Deploy recipes"
run: |
pangeo-forge-runner bake \
Expand All @@ -42,6 +42,7 @@ jobs:
# AT that point, screw it, not worth it.
run: |
jobname="${{ env.JOB_NAME }}"
echo "$JOB_NAME"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this useful to bring over to the template feedstock?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while true; do
count=$(gcloud dataflow jobs list --status=active --filter="name:${jobname}" --format="value(id)" | wc -l)
echo "Active Dataflow jobs: $count"
Expand Down
9 changes: 6 additions & 3 deletions configs/config_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
repo_path = os.environ['GITHUB_REPOSITORY']
FEEDSTOCK_NAME = repo_path.split('/')[-1]

c.Bake.prune = 1
c.Bake.prune = False
c.Bake.bakery_class = "pangeo_forge_runner.bakery.dataflow.DataflowBakery"
c.DataflowBakery.use_dataflow_prime = True
c.DataflowBakery.max_workers = 50
c.DataflowBakery.use_dataflow_prime = False
c.DataflowBakery.machine_type = "e2-highmem-16"
c.DataflowBakery.disk_size_gb = 400
c.DataflowBakery.use_shuffle = False
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do? I am actually just curious. Again it might be good to document this as a 'case' in the template feedstock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should cut it, since I had to create a fork of pangeo-forge-runner to add it. It disables dataflow shuffle , which I thought had some disk space limitations, but I think I was wrong, so we can use shuffle.

c.DataflowBakery.max_num_workers = 1
c.DataflowBakery.use_public_ips = True
c.DataflowBakery.service_account_email = (
"[email protected]"
Expand Down
17 changes: 5 additions & 12 deletions feedstock/catalog.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
# All the information important to cataloging.
"ncviewjs:meta_yaml_url": "https://github.com/leap-stc/proto_feedstock/blob/main/feedstock/meta.yaml"
"ncviewjs:meta_yaml_url": "https://github.com/leap-stc/eNATL_feedstock/blob/main/feedstock/meta.yaml"
tags:
- my-custom-tag
- zarr
- ocean
stores:
- id: "small"
name: "The cool small Proto Dataset"
url: "gs://leap-scratch/data-library/feedstocks/proto_feedstock/small.zarr"
"ncviewjs:rechunking":
- path: "gs://some-bucket/small.zarr"
use_case: "multiscales"

- id: "large"
name: "The even cooler large Proto Dataset" # no pyramids
url: "gs://leap-scratch/data-library/feedstocks/proto_feedstock/large.zarr"
- id: "enatl60-blbt02"
name: "Needs a name"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@auraoupa Can you help here? This name would show up in the LEAP catalog, see the marked portion here as example
image

url: "gs://leap-persistent/data-library/feedstocks/eNATL_feedstock/eNATL60-BLBT02.zarr"
93 changes: 69 additions & 24 deletions feedstock/eNATL60.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,80 @@
import xarray as xr
import pandas as pd
import apache_beam as beam
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
import pooch
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.transforms import (
ConsolidateMetadata,
ConsolidateDimensionCoordinates,
OpenURLWithFSSpec,
OpenWithXarray,
StoreToZarr,
)

from leap_data_management_utils.data_management_transforms import (
Copy,
get_catalog_store_urls,
)

catalog_store_urls = get_catalog_store_urls("feedstock/catalog.yaml")


dates = pd.date_range("2009-07-01", "2010-06-30", freq="D")

records = {
1: "10261988",
2: "10260907",
3: "10260980",
4: "10261078",
5: "10261126",
6: "10261192",
7: "10261274",
8: "10261349",
9: "10261461",
10: "10261540",
11: "10262356",
12: "10261643",
}


def make_full_path(time):
record = str(records[time.month])
date = (
"y"
+ str(time.year)
+ "m"
+ str("{:02d}".format(time.month))
+ "d"
+ str("{:02d}".format(time.day))
)
return (
f"https://zenodo.org/records/{record}/files/eNATL60-BLBT02_{date}.1d_TSW_60m.nc"
)


time_concat_dim = ConcatDim("time", dates)
pattern = FilePattern(make_full_path, time_concat_dim)
# pattern = pattern.prune(60)


# Common Parameters
days = range(1, 32)
dataset_url = "https://zenodo.org/records/10513552/files"
class OpenWithPooch(beam.PTransform):
@staticmethod
def _open_pooch(url: str) -> str:
return pooch.retrieve(url=url, known_hash=None)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀


## Monthly version
input_urls = [
f"{dataset_url}/eNATL60-BLBT02_y2009m07d{d:02d}.1d_TSWm_60m.nc" for d in days
]
pattern = pattern_from_file_sequence(input_urls, concat_dim="time")
def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | "open" >> beam.MapTuple(lambda k, v: (k, self._open_pooch(v)))


class Preprocess(beam.PTransform):
"""Custom transform to fix invalid time dimension"""

@staticmethod
def _set_coords(ds: xr.Dataset) -> xr.Dataset:
t_new = xr.DataArray(ds.time_counter.data, dims=["time"])
ds = ds.assign_coords(time=t_new)
ds = ds.drop(["time_counter"])
ds = ds.set_coords(["deptht", "depthw", "nav_lon", "nav_lat", "tmask"])

ds = ds.rename({"time_counter": "time"})
ds = ds.set_coords(("nav_lat", "nav_lon"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did t_mask go? See #8 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question, I'll rerun a subset to see what was up. We might have to regen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ds = ds.set_coords(("nav_lat", "nav_lon"))
ds = ds.set_coords(("nav_lat", "nav_lon", "t_mask"))

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I think I remember. I'm pretty sure some of the input netcdf files are missing "t_mask".

ds.attrs["deptht"] = ds.deptht.values[0]
ds = ds.drop("deptht")
ds = ds[["vosaline", "votemper", "vovecrtz"]]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah probably dropped here!

return ds

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
Expand All @@ -39,20 +83,21 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
)


eNATL60_BLBT02 = (
eNATL60BLBT02 = (
beam.Create(pattern.items())
| OpenURLWithFSSpec()
| OpenWithXarray(
xarray_open_kwargs={"use_cftime": True, "engine": "netcdf4"},
load=True,
copy_to_local=True,
)
# | OpenURLWithFSSpec(max_concurrency=1)
| OpenWithPooch()
| OpenWithXarray()
# xarray_open_kwargs={"use_cftime": True, "engine": "netcdf4"},
# load=True,
# copy_to_local=True,)
| Preprocess()
| StoreToZarr(
store_name="eNATL60_BLBT02.zarr",
store_name="eNATL60-BLBT02.zarr",
combine_dims=pattern.combine_dim_keys,
target_chunks={"x": 2000, "y": 2000, "time": 2},
target_chunks={"time": 30, "y": 900, "x": 900},
)
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
| Copy(target=catalog_store_urls["enatl60-blbt02"])
)
6 changes: 3 additions & 3 deletions feedstock/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ title: "LEAP Data Library"
description: >
eNATL60-TSW-60m is an extraction of a very high resolution oceanic simulation of the North Atlantic performed at MEOM, IGE (FRANCE)
recipes:
- id: eNATL60_BLBT02
object: "eNATL60:eNATL60_BLBT02"
- id: enatl60-blbt02
object: "eNATL60:eNATL60BLBT02"
provenance:
providers:
- name: "Zenodo"
Expand All @@ -23,4 +23,4 @@ maintainers:
github: jbusecke
- name: "Charles Stern"
orcid: 0000-0002-4078-0852
github: cisaacstern
github: cisaacstern
7 changes: 4 additions & 3 deletions feedstock/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pangeo-forge-recipes==0.10.4
pangeo-forge-recipes==0.10.7
gcsfs
apache-beam[gcp]
apache-beam[gcp] >= 2.58.0
leap-data-management-utils==0.0.12
xarray=2024.05.0
xarray==2024.05.0
pooch
Loading