-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add pipeline changes to run annual metrics for new GCM GLM outputs #59
Changes from 29 commits
5090c69
a81f5fe
7934ec2
c7bb31d
cb32987
557010a
c5ac5c9
3af7d77
786850c
98d0dbb
a6b96c3
b27fe85
7b77a72
25c4320
4a2201b
c91acf1
060bf31
a1ecc50
e76cf6c
50bcfdb
aa06e95
d44a025
cffceda
c5085bf
5c81a0e
ab7b790
09f8baa
c32ed42
e768cca
74810d2
847ba30
feb4169
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,2 @@ | ||
R_LIBS_USER="/cxfs/projects/usgs/water/iidd/data-sci/lake-temp/lake-temperature-out/Rlib_3_6":"/opt/ohpc/pub/usgs/libs/gnu8/R/3.6.3/lib64/R/library" | ||
#R_LIBS_USER="/cxfs/projects/usgs/water/iidd/data-sci/lake-temp/lake-temperature-out/Rlib_3_6":"/opt/ohpc/pub/usgs/libs/gnu8/R/3.6.3/lib64/R/library" | ||
R_LIBS_USER="/caldera/projects/usgs/water/iidd/datasci/lake-temp/lake-temperature-out/Rlib_3_6":"/home/software/tallgrass/arc/apps/R/3.6.3/lib64/R/library" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ packages: | |
|
||
sources: | ||
- 1_fetch/src/fetch_sb_item_files.R | ||
- 1_fetch/src/copy_caldera_files.R | ||
|
||
targets: | ||
1_fetch: | ||
|
@@ -28,6 +29,9 @@ targets: | |
- 1_fetch/out/temperature_observations.zip | ||
- 1_fetch/out/pb0_matched_to_observations.zip | ||
- 1_fetch/out/pgdl_matched_to_observations.zip | ||
# climate projections glm3 data release target | ||
- 1_fetch/out/pb0_temp_projections.yml | ||
- 1_fetch/out/pb0_temp_projections_nml.rds.ind | ||
|
||
sb_dl_date: | ||
command: c(I("2021-04-08")) | ||
|
@@ -167,3 +171,40 @@ targets: | |
sb_id = I("5e774324e4b01d509270e29f"), | ||
sb_filename = I("pgdl_matched_to_observations.zip"), | ||
dummy = sb_dl_date) | ||
|
||
##-- Access files for GLM3 + GCM projections data release --## | ||
|
||
# TODO: Switch to downloading the NetCDF file(s) from ScienceBase | ||
# to use as input instead. That would be less brittle. | ||
|
||
# Identifying what lakes / GLM temp output files to use to for | ||
# lake thermal metrics summaries depends on us changing this | ||
# date to refresh the list of files and hashes. Not the most | ||
# ideal process, but the function + practicality of this approach | ||
# is a bit of a compromise right now so we can complete this work. | ||
caldera_access_date: | ||
command: c(I('2022-02-16')) | ||
|
||
# These files from our process-models pipeline are also pushed to ScienceBase | ||
# item `6206d3c2d34ec05caca53071` but since we are running this on Tallgrass | ||
# we have access to Caldera and can skip the need to re-download. | ||
pb0_temp_projections_feathers: | ||
command: list.files( | ||
path = I('../lake-temperature-process-models/3_extract/out'), | ||
pattern = I('GLM_nhdhr_(.*)(GFDL|IPSL|MIROC5|MRI|ACCESS|CNRM).feather'), | ||
full.names = I(TRUE)) | ||
depends: | ||
- caldera_access_date | ||
1_fetch/out/pb0_temp_projections.yml: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was going to suggest you combine this target with the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, yes. In |
||
command: sc_indicate(target_name, data_file = pb0_temp_projections_feathers) | ||
depends: | ||
- caldera_access_date | ||
|
||
# These files from our process-models pipeline are also pushed to ScienceBase | ||
# item `6206d3c2d34ec05caca53071` but since we are running this on Tallgrass | ||
# we have access to Caldera and can skip the need to re-download. | ||
1_fetch/out/pb0_temp_projections_nml.rds.ind: | ||
command: copy_caldera_files(target_name, files_in = I('../lake-temperature-process-models/1_prep/in/nml_list.rds')) | ||
depends: | ||
- caldera_access_date | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
|
||
#' Use `out_dir` if you are copying multiple files and saving | ||
#' a hash table as the ind. Leave as `NULL` if you are only | ||
#' copying and saving one file. | ||
copy_caldera_files <- function(out_ind, files_in, out_dir = NULL) { | ||
|
||
if(is.null(out_dir)) { | ||
files_out <- as_data_file(out_ind) | ||
} else { | ||
files_out <- file.path(out_dir, basename(files_in)) | ||
} | ||
|
||
file.copy(from = files_in, to = files_out) | ||
sc_indicate(out_ind, data_file = files_out) | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,10 @@ extract_morphometry <- function(config_fn) { | |
return(morphometry_out) | ||
} | ||
|
||
nml_to_morphometry <- function(in_ind) { | ||
purrr::map(readRDS(as_data_file(in_ind)), `[`, c("latitude", "longitude", "H", "A")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just a check here to verify your functions that are related to morphometry, H/A still work the same way now that we've adjusted those all to have the elevations of the lakes taken into account (instead of all being 320). I'm 99% certain you are fine in all of these cases, but asking here in case you think it may be an issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for raising this point. I have always just used H/A and then converted later to hypso using max H. I don't think that change impacts my uses here. |
||
} | ||
|
||
unzip_data <- function(target_name, data_file, out_dir) { | ||
# Sometimes (as is the case with irradiance and clarity), the incoming | ||
# file has multiple zip files that need to be unpacked and saved | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ target_default: 3_summarize | |
|
||
packages: | ||
- arrow | ||
- data.table | ||
- dplyr | ||
- readr | ||
- tidyr | ||
|
@@ -12,6 +13,7 @@ sources: | |
- 2_process/src/calculate_toha.R # Has the benthic area fxn & resample_hypso | ||
- 3_summarize/src/annual_thermal_metrics.R | ||
- 3_summarize/src/do_annual_thermal_metric_tasks.R | ||
- 3_summarize/src/plot_annual_metric_summaries.R | ||
|
||
targets: | ||
3_summarize: | ||
|
@@ -25,6 +27,8 @@ targets: | |
# multi-state glm2 data release targets | ||
- 3_summarize/out/annual_metrics_glm2pb0.csv | ||
- 3_summarize/out/annual_metrics_glm2pball.csv | ||
# climate projections glm3 data release target | ||
- 3_summarize/out/annual_metrics_glm3pb0_gcms.csv | ||
|
||
##-- MN TOHA Data Release --## | ||
|
||
|
@@ -120,3 +124,32 @@ targets: | |
'3_summarize/src/annual_thermal_metrics.R') | ||
depends: | ||
- temp_ranges | ||
|
||
##-- GLM3 + GCM Projections Data Release --## | ||
|
||
# Do all GCMs at once and combine into a single file | ||
3_summarize/out/annual_metrics_glm3pb0_gcms.csv: | ||
command: do_annual_metrics_multi_lake( | ||
final_target = target_name, | ||
site_file_yml = "1_fetch/out/pb0_temp_projections.yml", | ||
ice_file_yml = I(NULL), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Assume this is because the ice data is now included in the temperature feathers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct |
||
n_cores = 79, | ||
morphometry = glm3_pb0gcm_proj_morphometry, | ||
temp_ranges = temp_ranges, | ||
site_file_regex = I("(GLM)_(.*)_(.*).feather"), | ||
tmpdir_suffix = I("_proj"), | ||
model_id_colname = I("GCM"), | ||
suffix_as_model_id = I(TRUE), | ||
'2_process/src/calculate_toha.R', | ||
'3_summarize/src/annual_thermal_metrics.R', | ||
'3_summarize/src/do_annual_thermal_metric_tasks.R') | ||
|
||
# Verify outputs through some plots: | ||
3_summarize/out_plots/annual_metrics_plots_gcms.yml: | ||
command: plot_annual_metric_summaries( | ||
target_name, | ||
in_file = "3_summarize/out/annual_metrics_glm3pb0_gcms.csv", | ||
target_dir = I("3_summarize/out_plots"), | ||
model_id_colname = I("GCM")) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,15 @@ | ||
|
||
do_annual_metrics_multi_lake <- function(final_target, site_file_yml, ice_file_yml, n_cores, morphometry, temp_ranges, ..., | ||
site_file_regex = NULL, ice_file_regex = NULL, tmpdir_suffix = "") { | ||
site_file_regex = NULL, ice_file_regex = NULL, tmpdir_suffix = "", | ||
model_id_colname = NULL, suffix_as_model_id = TRUE) { | ||
|
||
site_files <- get_filenames_from_ind(site_file_yml) | ||
ice_files <- get_filenames_from_ind(ice_file_yml) | ||
|
||
if(!is.null(ice_file_yml)) { | ||
ice_files <- get_filenames_from_ind(ice_file_yml) | ||
} else { | ||
ice_files <- "" | ||
} | ||
|
||
# Each node on a Yeti normal partition has a max of 20 cores; nodes on Yeti UV partition do not have that same limit | ||
if(n_cores > 20) message("If using a node on the Yeti normal partition, you need to decrease n_cores to 20 or less") | ||
|
@@ -25,14 +31,14 @@ do_annual_metrics_multi_lake <- function(final_target, site_file_yml, ice_file_y | |
|
||
temp_ranges_file <- sprintf("temp_ranges.rds") | ||
saveRDS(temp_ranges, temp_ranges_file) | ||
|
||
# Define task table rows | ||
tasks <- tibble(wtr_filename = site_files) %>% | ||
extract(wtr_filename, c('prefix','site_id','suffix'), site_file_regex, remove = FALSE) %>% | ||
left_join(extract(tibble(ice_filename = ice_files), ice_filename, c('site_id'), ice_file_regex, remove = FALSE), by = "site_id") %>% | ||
select(site_id, wtr_filename, ice_filename, prefix) | ||
select(site_id, wtr_filename, ice_filename, model_id = matches(ifelse(suffix_as_model_id, "suffix", "prefix"))) | ||
|
||
model_type <- pull(tasks, prefix) %>% unique() | ||
model_type <- pull(tasks, model_id) %>% unique() | ||
|
||
# Define task table columns | ||
|
||
|
@@ -50,40 +56,95 @@ do_annual_metrics_multi_lake <- function(final_target, site_file_yml, ice_file_y | |
} | ||
) | ||
|
||
calc_annual_metrics <- create_task_step( | ||
step_name = 'calc_annual_metrics', | ||
# Depending on how many models per site, this will be a list of one or more steps | ||
# Doing multiple here rather than another task table per model so that the morphometry files can be shared | ||
calc_annual_metrics <- purrr::map(model_type, function(model_type_i) { create_task_step( | ||
step_name = sprintf('calc_annual_metrics_%s', model_type_i), | ||
target_name = function(task_name, step_name, ...) { | ||
sprintf("3_summarize/tmp%s/%s_%s_annual_thermal_metrics.rds.ind", tmpdir_suffix, model_type, task_name) | ||
sprintf("3_summarize/tmp%s/%s_%s_annual_thermal_metrics.rds.ind", tmpdir_suffix, model_type_i, task_name) | ||
}, | ||
command = function(..., task_name, steps) { | ||
task_info <- filter(tasks, site_id == task_name) | ||
task_info <- filter(tasks, site_id == task_name, model_id == model_type_i) | ||
# Return empty string (not `NULL` to avoid warnings) if there is no data for this | ||
# site_id + model_id combo (not all combos were successful runs) | ||
if(nrow(task_info) == 0) return("") | ||
psprintf("calculate_annual_metrics_per_lake(", | ||
"out_ind = target_name,", | ||
"site_id = I('%s')," = task_name, | ||
"site_file = '%s'," = task_info$wtr_filename, | ||
"ice_file = '%s'," = task_info$ice_filename, | ||
"ice_file = %s," = ifelse(is.na(task_info$ice_filename), 'I(NULL)', sprintf("'%s'", task_info$ice_filename)), | ||
"temp_ranges_file = '%s'," = temp_ranges_file, | ||
"morphometry_ind = '%s'," = steps[["split_morphometry"]]$target_name, | ||
# Doesn't actually print to console with `loop_tasks` but let's you see if you are troubleshooting individual files | ||
"verbose = I(TRUE))" | ||
) | ||
} | ||
) | ||
)}) | ||
|
||
# If there is more than one model type, we should combine them per task | ||
# so that each task has the same "final" step that we can use for `final_steps` | ||
if(length(calc_annual_metrics) > 1) { | ||
combine_model_thermal_metrics <- scipiper::create_task_step( | ||
step_name = 'combine_model_thermal_metrics', | ||
target_name = function(task_name, step_name, ...){ | ||
sprintf("3_summarize/tmp%s/%s_annual_thermal_metrics.rds.ind", tmpdir_suffix, task_name) | ||
}, | ||
command = function(..., task_name, steps){ | ||
calc_annual_metrics_steps <- steps[grepl('calc_annual_metrics_', names(steps))] | ||
calc_annual_metrics_targets_str <- paste(sprintf("'%s'", sapply(calc_annual_metrics_steps, `[[`, "target_name")), collapse = ",") | ||
psprintf("combine_model_thermal_metrics(", | ||
"out_ind = target_name,", | ||
"model_id_colname = I('%s')," = model_id_colname, | ||
"%s)" = calc_annual_metrics_targets_str | ||
) | ||
} | ||
) | ||
|
||
task_steps_list <- c(list(split_morphometry), calc_annual_metrics, list(combine_model_thermal_metrics)) | ||
final_step_name <- "combine_model_thermal_metrics" | ||
} else { | ||
task_steps_list <- list(split_morphometry, calc_annual_metrics) | ||
final_step_name <- "calc_annual_metrics" | ||
} | ||
|
||
# Create the task plan | ||
task_plan <- create_task_plan( | ||
task_names = tasks$site_id, | ||
task_steps = list(split_morphometry, calc_annual_metrics), | ||
final_steps = c('calc_annual_metrics'), | ||
task_plan_all_combos <- create_task_plan( | ||
task_names = unique(tasks$site_id), | ||
task_steps = task_steps_list, | ||
final_steps = final_step_name, | ||
add_complete = FALSE) | ||
|
||
# Remove any steps with empty commands (there wasn't a matching site_id + model_id combo) | ||
task_plan <- lapply(task_plan_all_combos, function(task_def) { | ||
missing_command <- sapply(task_def$steps, function(step_def) nchar(step_def$command) == 0) | ||
missing_command_target_names <- sapply(task_def$steps[missing_command], `[[`, "target_name") | ||
|
||
# Remove the steps with missing commands | ||
task_def$steps[missing_command] <- NULL | ||
|
||
# Also remove in the final step for combining if necessary by removing the filepaths to | ||
# target names that don't exist | ||
task_final_step <- task_def$steps[[final_step_name]] | ||
if(!is.null(task_final_step)) { | ||
command_drop_target_names <- gsub(paste(missing_command_target_names, collapse="|"), "", task_final_step$command) | ||
command_drop_target_names <- gsub(",''|'',", "", command_drop_target_names) # Remove leftover string syntax | ||
task_def$steps[[final_step_name]]$command <- command_drop_target_names | ||
} | ||
|
||
return(task_def) | ||
}) | ||
|
||
# Need to make sure `task_plan` attributes make it back | ||
attributes(task_plan) <- attributes(task_plan_all_combos) | ||
|
||
|
||
# Create the task remakefile | ||
task_makefile <- sprintf('3_summarize_%s_metric_tasks.yml', model_type) | ||
task_makefile <- sprintf('3_summarize_%s_metric_tasks.yml', paste(model_type, collapse = ".")) # Collapse string if doing tasks for more than one model type | ||
create_task_makefile( | ||
task_plan = task_plan, | ||
makefile = task_makefile, | ||
sources = c(...), | ||
packages = c('tidyverse', 'purrr', 'readr', 'scipiper', 'arrow'), | ||
packages = c('tidyverse', 'purrr', 'readr', 'scipiper', 'arrow', 'data.table'), | ||
final_targets = final_target, | ||
finalize_funs = 'combine_thermal_metrics', | ||
as_promises = TRUE, | ||
|
@@ -92,7 +153,8 @@ do_annual_metrics_multi_lake <- function(final_target, site_file_yml, ice_file_y | |
# Build the tasks | ||
loop_tasks(task_plan = task_plan, | ||
task_makefile = task_makefile, | ||
num_tries = 3, | ||
task_names = unique(tasks$site_id), | ||
num_tries = 1, | ||
n_cores = n_cores) | ||
|
||
# Clean up files created | ||
|
@@ -111,6 +173,20 @@ combine_thermal_metrics <- function(target_name, ...) { | |
purrr::map(list(...), function(ind) readRDS(as_data_file(ind))) %>% purrr::reduce(bind_rows) %>% readr::write_csv(target_name) | ||
} | ||
|
||
combine_model_thermal_metrics <- function(out_ind, model_id_colname, ...) { | ||
data_file <- as_data_file(out_ind) | ||
purrr::map(list(...), function(ind) { | ||
rds_file <- as_data_file(ind) | ||
file_pattern <- "(?<modelid>.*)_(?<sitenum>nhdhr_.*)_annual_thermal_metrics.rds" # based on target_name of `calc_annual_metrics` step | ||
readRDS(rds_file) %>% | ||
mutate(model_id = str_match(basename(rds_file), file_pattern)[2]) %>% | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessary for a change, but wondering if But I'm not sure what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I always forget about |
||
select(site_id, !!model_id_colname := model_id, everything()) | ||
}) %>% | ||
purrr::reduce(bind_rows) %>% | ||
saveRDS(data_file) | ||
sc_indicate(ind_file = out_ind, data_file = data_file) | ||
} | ||
|
||
split_and_save_morphometry <- function(out_ind, morphometry_file, site_id) { | ||
data_file <- as_data_file(out_ind) | ||
morphometry <- readRDS(morphometry_file) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# Visualize output | ||
|
||
plot_annual_metric_summaries <- function(target_name, in_file, target_dir, model_id_colname) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. skipping over reviewing this plotting function |
||
|
||
if(!dir.exists(target_dir)) dir.create(target_dir) | ||
|
||
annual_metrics_data <- readr::read_csv(in_file) | ||
|
||
cols2plot <- names(annual_metrics_data)[which(!grepl(sprintf("site_id|GCM|year|_dt|_date|%s", model_id_colname), names(annual_metrics_data)))] | ||
|
||
annual_metrics_long <- annual_metrics_data %>% | ||
select(-matches("_dt|_date")) %>% | ||
pivot_longer(all_of(cols2plot), names_to = "metric") | ||
|
||
plot_groups <- split(cols2plot, ceiling(seq_along(cols2plot)/10)) | ||
files_out <- purrr::map(plot_groups, function(i) { | ||
p <- annual_metrics_long %>% | ||
filter(metric %in% i) %>% | ||
ggplot(aes(x = year, y = value)) + | ||
facet_grid(metric ~ ., scales = "free_y") + | ||
geom_point(alpha = 0.2, stroke = 0, shape = 16, size = 2) | ||
ggsave(sprintf("%s/%s_THROUGH_%s.png", target_dir, i[1], i[10]), | ||
plot = p, width = 5, height = 8) | ||
}) | ||
|
||
scipiper::sc_indicate(target_name, data_file = unlist(files_out)) | ||
} | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you just remove the commented line since this is versioned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh yep!