Skip to content

Commit

Permalink
Merge pull request #59 from lindsayplatt/pipeline_new_projected_GLM
Browse files Browse the repository at this point in the history
Add pipeline changes to run annual metrics for new GCM GLM outputs
  • Loading branch information
lindsayplatt authored May 11, 2022
2 parents a9ea210 + feb4169 commit 2b4520b
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .Renviron
Original file line number Diff line number Diff line change
@@ -1 +1 @@
R_LIBS_USER="/cxfs/projects/usgs/water/iidd/data-sci/lake-temp/lake-temperature-out/Rlib_3_6":"/opt/ohpc/pub/usgs/libs/gnu8/R/3.6.3/lib64/R/library"
R_LIBS_USER="/caldera/projects/usgs/water/iidd/datasci/lake-temp/lake-temperature-out/Rlib_3_6":"/home/software/tallgrass/arc/apps/R/3.6.3/lib64/R/library"
37 changes: 37 additions & 0 deletions 1_fetch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ packages:

sources:
- 1_fetch/src/fetch_sb_item_files.R
- 1_fetch/src/fetch_caldera_file_helpers.R

targets:
1_fetch:
Expand All @@ -28,6 +29,9 @@ targets:
- 1_fetch/out/temperature_observations.zip
- 1_fetch/out/pb0_matched_to_observations.zip
- 1_fetch/out/pgdl_matched_to_observations.zip
# climate projections glm3 data release target
- 1_fetch/out/pb0_temp_projections.yml
- 1_fetch/out/pb0_temp_projections_nml.rds.ind

sb_dl_date:
command: c(I("2021-04-08"))
Expand Down Expand Up @@ -167,3 +171,36 @@ targets:
sb_id = I("5e774324e4b01d509270e29f"),
sb_filename = I("pgdl_matched_to_observations.zip"),
dummy = sb_dl_date)

##-- Access files for GLM3 + GCM projections data release --##

# TODO: Switch to downloading the NetCDF file(s) from ScienceBase
# to use as input instead. That would be less brittle.

# Identifying what lakes / GLM temp output files to use to for
# lake thermal metrics summaries depends on us changing this
# date to refresh the list of files and hashes. Not the most
# ideal process, but the function + practicality of this approach
# is a bit of a compromise right now so we can complete this work.
caldera_access_date:
command: c(I('2022-02-16'))

# These files from our process-models pipeline are also pushed to ScienceBase
# item `6206d3c2d34ec05caca53071` but since we are running this on Tallgrass
# we have access to Caldera and can skip the need to re-download.
1_fetch/out/pb0_temp_projections.yml:
command: indicate_dir_files(
target_name,
path = I('../lake-temperature-process-models/3_extract/out'),
pattern = I('GLM_nhdhr_(.*)(GFDL|IPSL|MIROC5|MRI|ACCESS|CNRM).feather'))
depends:
- caldera_access_date

# These files from our process-models pipeline are also pushed to ScienceBase
# item `6206d3c2d34ec05caca53071` but since we are running this on Tallgrass
# we have access to Caldera and can skip the need to re-download.
1_fetch/out/pb0_temp_projections_nml.rds.ind:
command: copy_caldera_files(target_name, files_in = I('../lake-temperature-process-models/1_prep/in/nml_list.rds'))
depends:
- caldera_access_date

24 changes: 24 additions & 0 deletions 1_fetch/src/fetch_caldera_file_helpers.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

#' Indicate all files from a single directory into a single hash table
indicate_dir_files <- function(out_ind, file_dir, file_pattern) {
files_to_indicate <- list.files(path = file_dir,
pattern = file_pattern,
full.names = TRUE)
sc_indicate(out_ind, data_file = files_to_indicate)
}

#' Use `out_dir` if you are copying multiple files and saving
#' a hash table as the ind. Leave as `NULL` if you are only
#' copying and saving one file.
copy_caldera_files <- function(out_ind, files_in, out_dir = NULL) {

if(is.null(out_dir)) {
files_out <- as_data_file(out_ind)
} else {
files_out <- file.path(out_dir, basename(files_in))
}

file.copy(from = files_in, to = files_out)
sc_indicate(out_ind, data_file = files_out)

}
6 changes: 6 additions & 0 deletions 2_process.yml
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,9 @@ targets:
"2_process/src/do_obs_lakes_tasks.R")
depends:
- morphometry

##-- Munge files for GLM3 + GCM projections data release --##

# Not much munging required right now
glm3_pb0gcm_proj_morphometry:
command: nml_to_morphometry(in_ind = '1_fetch/out/pb0_temp_projections_nml.rds.ind')
4 changes: 4 additions & 0 deletions 2_process/src/process_helpers.R
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ extract_morphometry <- function(config_fn) {
return(morphometry_out)
}

nml_to_morphometry <- function(in_ind) {
purrr::map(readRDS(as_data_file(in_ind)), `[`, c("latitude", "longitude", "H", "A"))
}

unzip_data <- function(target_name, data_file, out_dir) {
# Sometimes (as is the case with irradiance and clarity), the incoming
# file has multiple zip files that need to be unpacked and saved
Expand Down
33 changes: 33 additions & 0 deletions 3_summarize.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ target_default: 3_summarize

packages:
- arrow
- data.table
- dplyr
- readr
- tidyr
Expand All @@ -12,6 +13,7 @@ sources:
- 2_process/src/calculate_toha.R # Has the benthic area fxn & resample_hypso
- 3_summarize/src/annual_thermal_metrics.R
- 3_summarize/src/do_annual_thermal_metric_tasks.R
- 3_summarize/src/plot_annual_metric_summaries.R

targets:
3_summarize:
Expand All @@ -25,6 +27,8 @@ targets:
# multi-state glm2 data release targets
- 3_summarize/out/annual_metrics_glm2pb0.csv
- 3_summarize/out/annual_metrics_glm2pball.csv
# climate projections glm3 data release target
- 3_summarize/out/annual_metrics_glm3pb0_gcms.csv

##-- MN TOHA Data Release --##

Expand Down Expand Up @@ -120,3 +124,32 @@ targets:
'3_summarize/src/annual_thermal_metrics.R')
depends:
- temp_ranges

##-- GLM3 + GCM Projections Data Release --##

# Do all GCMs at once and combine into a single file
3_summarize/out/annual_metrics_glm3pb0_gcms.csv:
command: do_annual_metrics_multi_lake(
final_target = target_name,
site_file_yml = "1_fetch/out/pb0_temp_projections.yml",
ice_file_yml = I(NULL),
n_cores = 79,
morphometry = glm3_pb0gcm_proj_morphometry,
temp_ranges = temp_ranges,
site_file_regex = I("(GLM)_(.*)_(.*).feather"),
tmpdir_suffix = I("_proj"),
model_id_colname = I("GCM"),
suffix_as_model_id = I(TRUE),
'2_process/src/calculate_toha.R',
'3_summarize/src/annual_thermal_metrics.R',
'3_summarize/src/do_annual_thermal_metric_tasks.R')

# Verify outputs through some plots:
3_summarize/out_plots/annual_metrics_plots_gcms.yml:
command: plot_annual_metric_summaries(
target_name,
in_file = "3_summarize/out/annual_metrics_glm3pb0_gcms.csv",
target_dir = I("3_summarize/out_plots"),
model_id_colname = I("GCM"))


2 changes: 2 additions & 0 deletions 3_summarize/src/annual_thermal_metrics.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

calculate_annual_metrics_per_lake <- function(out_ind, site_id, site_file, ice_file, temp_ranges_file, morphometry_ind, verbose = FALSE) {

if(!file.exists(site_file)) stop("File does not exist. If running summaries for GCM output, try changing `caldera_access_date` and build again.")

start_tm <- Sys.time()

if(tools::file_ext(site_file) == "feather") {
Expand Down
112 changes: 94 additions & 18 deletions 3_summarize/src/do_annual_thermal_metric_tasks.R
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@

do_annual_metrics_multi_lake <- function(final_target, site_file_yml, ice_file_yml, n_cores, morphometry, temp_ranges, ...,
site_file_regex = NULL, ice_file_regex = NULL, tmpdir_suffix = "") {
site_file_regex = NULL, ice_file_regex = NULL, tmpdir_suffix = "",
model_id_colname = NULL, suffix_as_model_id = TRUE) {

site_files <- get_filenames_from_ind(site_file_yml)
ice_files <- get_filenames_from_ind(ice_file_yml)

if(!is.null(ice_file_yml)) {
ice_files <- get_filenames_from_ind(ice_file_yml)
} else {
ice_files <- ""
}

# Each node on a Yeti normal partition has a max of 20 cores; nodes on Yeti UV partition do not have that same limit
if(n_cores > 20) message("If using a node on the Yeti normal partition, you need to decrease n_cores to 20 or less")
Expand All @@ -25,14 +31,14 @@ do_annual_metrics_multi_lake <- function(final_target, site_file_yml, ice_file_y

temp_ranges_file <- sprintf("temp_ranges.rds")
saveRDS(temp_ranges, temp_ranges_file)

# Define task table rows
tasks <- tibble(wtr_filename = site_files) %>%
extract(wtr_filename, c('prefix','site_id','suffix'), site_file_regex, remove = FALSE) %>%
left_join(extract(tibble(ice_filename = ice_files), ice_filename, c('site_id'), ice_file_regex, remove = FALSE), by = "site_id") %>%
select(site_id, wtr_filename, ice_filename, prefix)
select(site_id, wtr_filename, ice_filename, model_id = matches(ifelse(suffix_as_model_id, "suffix", "prefix")))

model_type <- pull(tasks, prefix) %>% unique()
model_type <- pull(tasks, model_id) %>% unique()

# Define task table columns

Expand All @@ -50,40 +56,95 @@ do_annual_metrics_multi_lake <- function(final_target, site_file_yml, ice_file_y
}
)

calc_annual_metrics <- create_task_step(
step_name = 'calc_annual_metrics',
# Depending on how many models per site, this will be a list of one or more steps
# Doing multiple here rather than another task table per model so that the morphometry files can be shared
calc_annual_metrics <- purrr::map(model_type, function(model_type_i) { create_task_step(
step_name = sprintf('calc_annual_metrics_%s', model_type_i),
target_name = function(task_name, step_name, ...) {
sprintf("3_summarize/tmp%s/%s_%s_annual_thermal_metrics.rds.ind", tmpdir_suffix, model_type, task_name)
sprintf("3_summarize/tmp%s/%s_%s_annual_thermal_metrics.rds.ind", tmpdir_suffix, model_type_i, task_name)
},
command = function(..., task_name, steps) {
task_info <- filter(tasks, site_id == task_name)
task_info <- filter(tasks, site_id == task_name, model_id == model_type_i)
# Return empty string (not `NULL` to avoid warnings) if there is no data for this
# site_id + model_id combo (not all combos were successful runs)
if(nrow(task_info) == 0) return("")
psprintf("calculate_annual_metrics_per_lake(",
"out_ind = target_name,",
"site_id = I('%s')," = task_name,
"site_file = '%s'," = task_info$wtr_filename,
"ice_file = '%s'," = task_info$ice_filename,
"ice_file = %s," = ifelse(is.na(task_info$ice_filename), 'I(NULL)', sprintf("'%s'", task_info$ice_filename)),
"temp_ranges_file = '%s'," = temp_ranges_file,
"morphometry_ind = '%s'," = steps[["split_morphometry"]]$target_name,
# Doesn't actually print to console with `loop_tasks` but let's you see if you are troubleshooting individual files
"verbose = I(TRUE))"
)
}
)
)})

# If there is more than one model type, we should combine them per task
# so that each task has the same "final" step that we can use for `final_steps`
if(length(calc_annual_metrics) > 1) {
combine_model_thermal_metrics <- scipiper::create_task_step(
step_name = 'combine_model_thermal_metrics',
target_name = function(task_name, step_name, ...){
sprintf("3_summarize/tmp%s/%s_annual_thermal_metrics.rds.ind", tmpdir_suffix, task_name)
},
command = function(..., task_name, steps){
calc_annual_metrics_steps <- steps[grepl('calc_annual_metrics_', names(steps))]
calc_annual_metrics_targets_str <- paste(sprintf("'%s'", sapply(calc_annual_metrics_steps, `[[`, "target_name")), collapse = ",")
psprintf("combine_model_thermal_metrics(",
"out_ind = target_name,",
"model_id_colname = I('%s')," = model_id_colname,
"%s)" = calc_annual_metrics_targets_str
)
}
)

task_steps_list <- c(list(split_morphometry), calc_annual_metrics, list(combine_model_thermal_metrics))
final_step_name <- "combine_model_thermal_metrics"
} else {
task_steps_list <- list(split_morphometry, calc_annual_metrics)
final_step_name <- "calc_annual_metrics"
}

# Create the task plan
task_plan <- create_task_plan(
task_names = tasks$site_id,
task_steps = list(split_morphometry, calc_annual_metrics),
final_steps = c('calc_annual_metrics'),
task_plan_all_combos <- create_task_plan(
task_names = unique(tasks$site_id),
task_steps = task_steps_list,
final_steps = final_step_name,
add_complete = FALSE)

# Remove any steps with empty commands (there wasn't a matching site_id + model_id combo)
task_plan <- lapply(task_plan_all_combos, function(task_def) {
missing_command <- sapply(task_def$steps, function(step_def) nchar(step_def$command) == 0)
missing_command_target_names <- sapply(task_def$steps[missing_command], `[[`, "target_name")

# Remove the steps with missing commands
task_def$steps[missing_command] <- NULL

# Also remove in the final step for combining if necessary by removing the filepaths to
# target names that don't exist
task_final_step <- task_def$steps[[final_step_name]]
if(!is.null(task_final_step)) {
command_drop_target_names <- gsub(paste(missing_command_target_names, collapse="|"), "", task_final_step$command)
command_drop_target_names <- gsub(",''|'',", "", command_drop_target_names) # Remove leftover string syntax
task_def$steps[[final_step_name]]$command <- command_drop_target_names
}

return(task_def)
})

# Need to make sure `task_plan` attributes make it back
attributes(task_plan) <- attributes(task_plan_all_combos)


# Create the task remakefile
task_makefile <- sprintf('3_summarize_%s_metric_tasks.yml', model_type)
task_makefile <- sprintf('3_summarize_%s_metric_tasks.yml', paste(model_type, collapse = ".")) # Collapse string if doing tasks for more than one model type
create_task_makefile(
task_plan = task_plan,
makefile = task_makefile,
sources = c(...),
packages = c('tidyverse', 'purrr', 'readr', 'scipiper', 'arrow'),
packages = c('tidyverse', 'purrr', 'readr', 'scipiper', 'arrow', 'data.table'),
final_targets = final_target,
finalize_funs = 'combine_thermal_metrics',
as_promises = TRUE,
Expand All @@ -92,7 +153,8 @@ do_annual_metrics_multi_lake <- function(final_target, site_file_yml, ice_file_y
# Build the tasks
loop_tasks(task_plan = task_plan,
task_makefile = task_makefile,
num_tries = 3,
task_names = unique(tasks$site_id),
num_tries = 1,
n_cores = n_cores)

# Clean up files created
Expand All @@ -111,6 +173,20 @@ combine_thermal_metrics <- function(target_name, ...) {
purrr::map(list(...), function(ind) readRDS(as_data_file(ind))) %>% purrr::reduce(bind_rows) %>% readr::write_csv(target_name)
}

combine_model_thermal_metrics <- function(out_ind, model_id_colname, ...) {
data_file <- as_data_file(out_ind)
purrr::map(list(...), function(ind) {
rds_file <- as_data_file(ind)
file_pattern <- "(?<modelid>.*)_(?<sitenum>nhdhr_.*)_annual_thermal_metrics.rds" # based on target_name of `calc_annual_metrics` step
readRDS(rds_file) %>%
mutate(model_id = str_match(basename(rds_file), file_pattern)[2]) %>%
select(site_id, !!model_id_colname := model_id, everything())
}) %>%
purrr::reduce(bind_rows) %>%
saveRDS(data_file)
sc_indicate(ind_file = out_ind, data_file = data_file)
}

split_and_save_morphometry <- function(out_ind, morphometry_file, site_id) {
data_file <- as_data_file(out_ind)
morphometry <- readRDS(morphometry_file)
Expand Down
30 changes: 30 additions & 0 deletions 3_summarize/src/plot_annual_metric_summaries.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Visualize output

plot_annual_metric_summaries <- function(target_name, in_file, target_dir, model_id_colname) {

if(!dir.exists(target_dir)) dir.create(target_dir)

annual_metrics_data <- readr::read_csv(in_file)

cols2plot <- names(annual_metrics_data)[which(!grepl(sprintf("site_id|GCM|year|_dt|_date|%s", model_id_colname), names(annual_metrics_data)))]

annual_metrics_long <- annual_metrics_data %>%
select(-matches("_dt|_date")) %>%
pivot_longer(all_of(cols2plot), names_to = "metric")

plot_groups <- split(cols2plot, ceiling(seq_along(cols2plot)/10))
files_out <- purrr::map(plot_groups, function(i) {
p <- annual_metrics_long %>%
filter(metric %in% i) %>%
ggplot(aes(x = year, y = value)) +
facet_grid(metric ~ ., scales = "free_y") +
geom_point(alpha = 0.2, stroke = 0, shape = 16, size = 2)
ggsave(sprintf("%s/%s_THROUGH_%s.png", target_dir, i[1], i[10]),
plot = p, width = 5, height = 8)
})

scipiper::sc_indicate(target_name, data_file = unlist(files_out))
}



0 comments on commit 2b4520b

Please sign in to comment.