-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add pipeline changes to run annual metrics for new GCM GLM outputs #59
Changes from all commits
5090c69
a81f5fe
7934ec2
c7bb31d
cb32987
557010a
c5ac5c9
3af7d77
786850c
98d0dbb
a6b96c3
b27fe85
7b77a72
25c4320
4a2201b
c91acf1
060bf31
a1ecc50
e76cf6c
50bcfdb
aa06e95
d44a025
cffceda
c5085bf
5c81a0e
ab7b790
09f8baa
c32ed42
e768cca
74810d2
847ba30
feb4169
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
R_LIBS_USER="/cxfs/projects/usgs/water/iidd/data-sci/lake-temp/lake-temperature-out/Rlib_3_6":"/opt/ohpc/pub/usgs/libs/gnu8/R/3.6.3/lib64/R/library" | ||
R_LIBS_USER="/caldera/projects/usgs/water/iidd/datasci/lake-temp/lake-temperature-out/Rlib_3_6":"/home/software/tallgrass/arc/apps/R/3.6.3/lib64/R/library" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
|
||
#' Indicate all files from a single directory into a single hash table | ||
indicate_dir_files <- function(out_ind, file_dir, file_pattern) { | ||
files_to_indicate <- list.files(path = file_dir, | ||
pattern = file_pattern, | ||
full.names = TRUE) | ||
sc_indicate(out_ind, data_file = files_to_indicate) | ||
} | ||
|
||
#' Use `out_dir` if you are copying multiple files and saving | ||
#' a hash table as the ind. Leave as `NULL` if you are only | ||
#' copying and saving one file. | ||
copy_caldera_files <- function(out_ind, files_in, out_dir = NULL) { | ||
|
||
if(is.null(out_dir)) { | ||
files_out <- as_data_file(out_ind) | ||
} else { | ||
files_out <- file.path(out_dir, basename(files_in)) | ||
} | ||
|
||
file.copy(from = files_in, to = files_out) | ||
sc_indicate(out_ind, data_file = files_out) | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,10 @@ extract_morphometry <- function(config_fn) { | |
return(morphometry_out) | ||
} | ||
|
||
nml_to_morphometry <- function(in_ind) { | ||
purrr::map(readRDS(as_data_file(in_ind)), `[`, c("latitude", "longitude", "H", "A")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just a check here to verify your functions that are related to morphometry, H/A still work the same way now that we've adjusted those all to have the elevations of the lakes taken into account (instead of all being 320). I'm 99% certain you are fine in all of these cases, but asking here in case you think it may be an issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for raising this point. I have always just used H/A and then converted later to hypso using max H. I don't think that change impacts my uses here. |
||
} | ||
|
||
unzip_data <- function(target_name, data_file, out_dir) { | ||
# Sometimes (as is the case with irradiance and clarity), the incoming | ||
# file has multiple zip files that need to be unpacked and saved | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ target_default: 3_summarize | |
|
||
packages: | ||
- arrow | ||
- data.table | ||
- dplyr | ||
- readr | ||
- tidyr | ||
|
@@ -12,6 +13,7 @@ sources: | |
- 2_process/src/calculate_toha.R # Has the benthic area fxn & resample_hypso | ||
- 3_summarize/src/annual_thermal_metrics.R | ||
- 3_summarize/src/do_annual_thermal_metric_tasks.R | ||
- 3_summarize/src/plot_annual_metric_summaries.R | ||
|
||
targets: | ||
3_summarize: | ||
|
@@ -25,6 +27,8 @@ targets: | |
# multi-state glm2 data release targets | ||
- 3_summarize/out/annual_metrics_glm2pb0.csv | ||
- 3_summarize/out/annual_metrics_glm2pball.csv | ||
# climate projections glm3 data release target | ||
- 3_summarize/out/annual_metrics_glm3pb0_gcms.csv | ||
|
||
##-- MN TOHA Data Release --## | ||
|
||
|
@@ -120,3 +124,32 @@ targets: | |
'3_summarize/src/annual_thermal_metrics.R') | ||
depends: | ||
- temp_ranges | ||
|
||
##-- GLM3 + GCM Projections Data Release --## | ||
|
||
# Do all GCMs at once and combine into a single file | ||
3_summarize/out/annual_metrics_glm3pb0_gcms.csv: | ||
command: do_annual_metrics_multi_lake( | ||
final_target = target_name, | ||
site_file_yml = "1_fetch/out/pb0_temp_projections.yml", | ||
ice_file_yml = I(NULL), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Assume this is because the ice data is now included in the temperature feathers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct |
||
n_cores = 79, | ||
morphometry = glm3_pb0gcm_proj_morphometry, | ||
temp_ranges = temp_ranges, | ||
site_file_regex = I("(GLM)_(.*)_(.*).feather"), | ||
tmpdir_suffix = I("_proj"), | ||
model_id_colname = I("GCM"), | ||
suffix_as_model_id = I(TRUE), | ||
'2_process/src/calculate_toha.R', | ||
'3_summarize/src/annual_thermal_metrics.R', | ||
'3_summarize/src/do_annual_thermal_metric_tasks.R') | ||
|
||
# Verify outputs through some plots: | ||
3_summarize/out_plots/annual_metrics_plots_gcms.yml: | ||
command: plot_annual_metric_summaries( | ||
target_name, | ||
in_file = "3_summarize/out/annual_metrics_glm3pb0_gcms.csv", | ||
target_dir = I("3_summarize/out_plots"), | ||
model_id_colname = I("GCM")) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
|
||
calculate_annual_metrics_per_lake <- function(out_ind, site_id, site_file, ice_file, temp_ranges_file, morphometry_ind, verbose = FALSE) { | ||
|
||
if(!file.exists(site_file)) stop("File does not exist. If running summaries for GCM output, try changing `caldera_access_date` and build again.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
||
start_tm <- Sys.time() | ||
|
||
if(tools::file_ext(site_file) == "feather") { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,15 @@ | ||
|
||
do_annual_metrics_multi_lake <- function(final_target, site_file_yml, ice_file_yml, n_cores, morphometry, temp_ranges, ..., | ||
site_file_regex = NULL, ice_file_regex = NULL, tmpdir_suffix = "") { | ||
site_file_regex = NULL, ice_file_regex = NULL, tmpdir_suffix = "", | ||
model_id_colname = NULL, suffix_as_model_id = TRUE) { | ||
|
||
site_files <- get_filenames_from_ind(site_file_yml) | ||
ice_files <- get_filenames_from_ind(ice_file_yml) | ||
|
||
if(!is.null(ice_file_yml)) { | ||
ice_files <- get_filenames_from_ind(ice_file_yml) | ||
} else { | ||
ice_files <- "" | ||
} | ||
|
||
# Each node on a Yeti normal partition has a max of 20 cores; nodes on Yeti UV partition do not have that same limit | ||
if(n_cores > 20) message("If using a node on the Yeti normal partition, you need to decrease n_cores to 20 or less") | ||
|
@@ -25,14 +31,14 @@ do_annual_metrics_multi_lake <- function(final_target, site_file_yml, ice_file_y | |
|
||
temp_ranges_file <- sprintf("temp_ranges.rds") | ||
saveRDS(temp_ranges, temp_ranges_file) | ||
|
||
# Define task table rows | ||
tasks <- tibble(wtr_filename = site_files) %>% | ||
extract(wtr_filename, c('prefix','site_id','suffix'), site_file_regex, remove = FALSE) %>% | ||
left_join(extract(tibble(ice_filename = ice_files), ice_filename, c('site_id'), ice_file_regex, remove = FALSE), by = "site_id") %>% | ||
select(site_id, wtr_filename, ice_filename, prefix) | ||
select(site_id, wtr_filename, ice_filename, model_id = matches(ifelse(suffix_as_model_id, "suffix", "prefix"))) | ||
|
||
model_type <- pull(tasks, prefix) %>% unique() | ||
model_type <- pull(tasks, model_id) %>% unique() | ||
|
||
# Define task table columns | ||
|
||
|
@@ -50,40 +56,95 @@ do_annual_metrics_multi_lake <- function(final_target, site_file_yml, ice_file_y | |
} | ||
) | ||
|
||
calc_annual_metrics <- create_task_step( | ||
step_name = 'calc_annual_metrics', | ||
# Depending on how many models per site, this will be a list of one or more steps | ||
# Doing multiple here rather than another task table per model so that the morphometry files can be shared | ||
calc_annual_metrics <- purrr::map(model_type, function(model_type_i) { create_task_step( | ||
step_name = sprintf('calc_annual_metrics_%s', model_type_i), | ||
target_name = function(task_name, step_name, ...) { | ||
sprintf("3_summarize/tmp%s/%s_%s_annual_thermal_metrics.rds.ind", tmpdir_suffix, model_type, task_name) | ||
sprintf("3_summarize/tmp%s/%s_%s_annual_thermal_metrics.rds.ind", tmpdir_suffix, model_type_i, task_name) | ||
}, | ||
command = function(..., task_name, steps) { | ||
task_info <- filter(tasks, site_id == task_name) | ||
task_info <- filter(tasks, site_id == task_name, model_id == model_type_i) | ||
# Return empty string (not `NULL` to avoid warnings) if there is no data for this | ||
# site_id + model_id combo (not all combos were successful runs) | ||
if(nrow(task_info) == 0) return("") | ||
psprintf("calculate_annual_metrics_per_lake(", | ||
"out_ind = target_name,", | ||
"site_id = I('%s')," = task_name, | ||
"site_file = '%s'," = task_info$wtr_filename, | ||
"ice_file = '%s'," = task_info$ice_filename, | ||
"ice_file = %s," = ifelse(is.na(task_info$ice_filename), 'I(NULL)', sprintf("'%s'", task_info$ice_filename)), | ||
"temp_ranges_file = '%s'," = temp_ranges_file, | ||
"morphometry_ind = '%s'," = steps[["split_morphometry"]]$target_name, | ||
# Doesn't actually print to console with `loop_tasks` but let's you see if you are troubleshooting individual files | ||
"verbose = I(TRUE))" | ||
) | ||
} | ||
) | ||
)}) | ||
|
||
# If there is more than one model type, we should combine them per task | ||
# so that each task has the same "final" step that we can use for `final_steps` | ||
if(length(calc_annual_metrics) > 1) { | ||
combine_model_thermal_metrics <- scipiper::create_task_step( | ||
step_name = 'combine_model_thermal_metrics', | ||
target_name = function(task_name, step_name, ...){ | ||
sprintf("3_summarize/tmp%s/%s_annual_thermal_metrics.rds.ind", tmpdir_suffix, task_name) | ||
}, | ||
command = function(..., task_name, steps){ | ||
calc_annual_metrics_steps <- steps[grepl('calc_annual_metrics_', names(steps))] | ||
calc_annual_metrics_targets_str <- paste(sprintf("'%s'", sapply(calc_annual_metrics_steps, `[[`, "target_name")), collapse = ",") | ||
psprintf("combine_model_thermal_metrics(", | ||
"out_ind = target_name,", | ||
"model_id_colname = I('%s')," = model_id_colname, | ||
"%s)" = calc_annual_metrics_targets_str | ||
) | ||
} | ||
) | ||
|
||
task_steps_list <- c(list(split_morphometry), calc_annual_metrics, list(combine_model_thermal_metrics)) | ||
final_step_name <- "combine_model_thermal_metrics" | ||
} else { | ||
task_steps_list <- list(split_morphometry, calc_annual_metrics) | ||
final_step_name <- "calc_annual_metrics" | ||
} | ||
|
||
# Create the task plan | ||
task_plan <- create_task_plan( | ||
task_names = tasks$site_id, | ||
task_steps = list(split_morphometry, calc_annual_metrics), | ||
final_steps = c('calc_annual_metrics'), | ||
task_plan_all_combos <- create_task_plan( | ||
task_names = unique(tasks$site_id), | ||
task_steps = task_steps_list, | ||
final_steps = final_step_name, | ||
add_complete = FALSE) | ||
|
||
# Remove any steps with empty commands (there wasn't a matching site_id + model_id combo) | ||
task_plan <- lapply(task_plan_all_combos, function(task_def) { | ||
missing_command <- sapply(task_def$steps, function(step_def) nchar(step_def$command) == 0) | ||
missing_command_target_names <- sapply(task_def$steps[missing_command], `[[`, "target_name") | ||
|
||
# Remove the steps with missing commands | ||
task_def$steps[missing_command] <- NULL | ||
|
||
# Also remove in the final step for combining if necessary by removing the filepaths to | ||
# target names that don't exist | ||
task_final_step <- task_def$steps[[final_step_name]] | ||
if(!is.null(task_final_step)) { | ||
command_drop_target_names <- gsub(paste(missing_command_target_names, collapse="|"), "", task_final_step$command) | ||
command_drop_target_names <- gsub(",''|'',", "", command_drop_target_names) # Remove leftover string syntax | ||
task_def$steps[[final_step_name]]$command <- command_drop_target_names | ||
} | ||
|
||
return(task_def) | ||
}) | ||
|
||
# Need to make sure `task_plan` attributes make it back | ||
attributes(task_plan) <- attributes(task_plan_all_combos) | ||
|
||
|
||
# Create the task remakefile | ||
task_makefile <- sprintf('3_summarize_%s_metric_tasks.yml', model_type) | ||
task_makefile <- sprintf('3_summarize_%s_metric_tasks.yml', paste(model_type, collapse = ".")) # Collapse string if doing tasks for more than one model type | ||
create_task_makefile( | ||
task_plan = task_plan, | ||
makefile = task_makefile, | ||
sources = c(...), | ||
packages = c('tidyverse', 'purrr', 'readr', 'scipiper', 'arrow'), | ||
packages = c('tidyverse', 'purrr', 'readr', 'scipiper', 'arrow', 'data.table'), | ||
final_targets = final_target, | ||
finalize_funs = 'combine_thermal_metrics', | ||
as_promises = TRUE, | ||
|
@@ -92,7 +153,8 @@ do_annual_metrics_multi_lake <- function(final_target, site_file_yml, ice_file_y | |
# Build the tasks | ||
loop_tasks(task_plan = task_plan, | ||
task_makefile = task_makefile, | ||
num_tries = 3, | ||
task_names = unique(tasks$site_id), | ||
num_tries = 1, | ||
n_cores = n_cores) | ||
|
||
# Clean up files created | ||
|
@@ -111,6 +173,20 @@ combine_thermal_metrics <- function(target_name, ...) { | |
purrr::map(list(...), function(ind) readRDS(as_data_file(ind))) %>% purrr::reduce(bind_rows) %>% readr::write_csv(target_name) | ||
} | ||
|
||
combine_model_thermal_metrics <- function(out_ind, model_id_colname, ...) { | ||
data_file <- as_data_file(out_ind) | ||
purrr::map(list(...), function(ind) { | ||
rds_file <- as_data_file(ind) | ||
file_pattern <- "(?<modelid>.*)_(?<sitenum>nhdhr_.*)_annual_thermal_metrics.rds" # based on target_name of `calc_annual_metrics` step | ||
readRDS(rds_file) %>% | ||
mutate(model_id = str_match(basename(rds_file), file_pattern)[2]) %>% | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessary for a change, but wondering if But I'm not sure what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I always forget about |
||
select(site_id, !!model_id_colname := model_id, everything()) | ||
}) %>% | ||
purrr::reduce(bind_rows) %>% | ||
saveRDS(data_file) | ||
sc_indicate(ind_file = out_ind, data_file = data_file) | ||
} | ||
|
||
split_and_save_morphometry <- function(out_ind, morphometry_file, site_id) { | ||
data_file <- as_data_file(out_ind) | ||
morphometry <- readRDS(morphometry_file) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# Visualize output | ||
|
||
plot_annual_metric_summaries <- function(target_name, in_file, target_dir, model_id_colname) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. skipping over reviewing this plotting function |
||
|
||
if(!dir.exists(target_dir)) dir.create(target_dir) | ||
|
||
annual_metrics_data <- readr::read_csv(in_file) | ||
|
||
cols2plot <- names(annual_metrics_data)[which(!grepl(sprintf("site_id|GCM|year|_dt|_date|%s", model_id_colname), names(annual_metrics_data)))] | ||
|
||
annual_metrics_long <- annual_metrics_data %>% | ||
select(-matches("_dt|_date")) %>% | ||
pivot_longer(all_of(cols2plot), names_to = "metric") | ||
|
||
plot_groups <- split(cols2plot, ceiling(seq_along(cols2plot)/10)) | ||
files_out <- purrr::map(plot_groups, function(i) { | ||
p <- annual_metrics_long %>% | ||
filter(metric %in% i) %>% | ||
ggplot(aes(x = year, y = value)) + | ||
facet_grid(metric ~ ., scales = "free_y") + | ||
geom_point(alpha = 0.2, stroke = 0, shape = 16, size = 2) | ||
ggsave(sprintf("%s/%s_THROUGH_%s.png", target_dir, i[1], i[10]), | ||
plot = p, width = 5, height = 8) | ||
}) | ||
|
||
scipiper::sc_indicate(target_name, data_file = unlist(files_out)) | ||
} | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was going to suggest you combine this target with the
list.files
target, but now I see you are using two base functions and this keeps you from writing a custom function. But something to consider in the future if you run into this since there isn't a clear benefit of having the two be separate (list files is the vector of file names,sc_indicate
makes it a hash table).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yes. In
targets
I could do both at the same time without a custom function 😄 Custom function might be helpful here since I can name it in a way that describes what's happening.