diff --git a/DESCRIPTION b/DESCRIPTION index e755bf6eb..9c2db9237 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -103,15 +103,12 @@ Imports: cli, data.table, futile.logger (>= 1.4), - future, - future.apply, ggplot2, lifecycle, lubridate, methods, patchwork, posterior, - progressr, purrr, R.utils (>= 2.0.0), Rcpp (>= 0.12.0), @@ -126,9 +123,12 @@ Imports: Suggests: cmdstanr, covr, + future, + future.apply, here, knitr, precommit, + progressr, rmarkdown, spelling, testthat, diff --git a/NAMESPACE b/NAMESPACE index f2d0c3cb2..75ed3d18e 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -167,10 +167,6 @@ importFrom(futile.logger,flog.threshold) importFrom(futile.logger,flog.trace) importFrom(futile.logger,flog.warn) importFrom(futile.logger,ftry) -importFrom(future,availableCores) -importFrom(future,plan) -importFrom(future,tweak) -importFrom(future.apply,future_lapply) importFrom(ggplot2,.data) importFrom(ggplot2,aes) importFrom(ggplot2,coord_cartesian) @@ -205,8 +201,6 @@ importFrom(lubridate,days) importFrom(lubridate,wday) importFrom(patchwork,plot_layout) importFrom(posterior,mcse_mean) -importFrom(progressr,progressor) -importFrom(progressr,with_progress) importFrom(purrr,compact) importFrom(purrr,flatten) importFrom(purrr,keep) diff --git a/NEWS.md b/NEWS.md index 94400f744..471a3ef79 100644 --- a/NEWS.md +++ b/NEWS.md @@ -30,6 +30,7 @@ - `fix_dist()` has been renamed to `fix_parameters()` because it removes the uncertainty in a distribution's parameters. By @sbfnk in #733 and reviewed by @jamesmbaazam. - `plot.dist_spec` now uses color instead of line types to display pmfs vs cmfs. By @jamesmbaazam in #788 and reviewed by @sbfnk. +- The use of the `{progressr}` package for displaying progress bars is now optional, as is the use of `{future}` and `{future.apply}` for parallelisation. By @sbfnk in #798 and reviewed by @seabbs. ## Bug fixes diff --git a/R/deprecated.R b/R/deprecated.R index 3cc3d90a4..ada3a16d6 100644 --- a/R/deprecated.R +++ b/R/deprecated.R @@ -221,7 +221,6 @@ lognorm_dist_def <- function(mean, mean_sd, #' @inheritParams estimate_infections #' @inheritParams adjust_infection_to_report #' @importFrom data.table data.table rbindlist -#' @importFrom future.apply future_lapply report_cases <- function(case_estimates, case_forecast = NULL, delays, diff --git a/R/estimate_delay.R b/R/estimate_delay.R index 05c2d9aef..76af0a97b 100644 --- a/R/estimate_delay.R +++ b/R/estimate_delay.R @@ -141,7 +141,6 @@ dist_fit <- function(values = NULL, samples = 1000, cores = 1, #' #' @return A `` object summarising the bootstrapped distribution #' @importFrom purrr list_transpose -#' @importFrom future.apply future_lapply #' @importFrom rstan extract #' @importFrom data.table data.table rbindlist #' @importFrom cli cli_abort col_blue @@ -199,7 +198,7 @@ bootstrapped_dist_fit <- function(values, dist = "lognormal", dist_samples <- get_single_dist(values, samples = samples) } else { ## Fit each sub sample - dist_samples <- future.apply::future_lapply(1:bootstraps, + dist_samples <- lapply_func(1:bootstraps, function(boot) { get_single_dist( sample(values, @@ -209,12 +208,15 @@ bootstrapped_dist_fit <- function(values, dist = "lognormal", samples = ceiling(samples / bootstraps) ) }, - future.scheduling = Inf, - future.globals = c( - "values", "bootstraps", "samples", - "bootstrap_samples", "get_single_dist" - ), - future.packages = "data.table", future.seed = TRUE + future.opts = list( + future.scheduling = Inf, + future.globals = c( + "values", "bootstraps", "samples", + "bootstrap_samples", "get_single_dist" + ), + future.packages = "data.table", + future.seed = TRUE + ) ) diff --git a/R/fit.R b/R/fit.R index 953f78996..ead410bd6 100644 --- a/R/fit.R +++ b/R/fit.R @@ -20,7 +20,6 @@ #' #' @importFrom futile.logger flog.debug flog.info flog.error #' @importFrom R.utils withTimeout -#' @importFrom future.apply future_lapply #' @importFrom purrr compact #' @importFrom rstan sflist2stanfit sampling #' @importFrom rlang abort cnd_muffle @@ -103,12 +102,11 @@ fit_model_with_nuts <- function(args, future = FALSE, max_execution_time = Inf, chains <- args$chains args$chains <- 1 args$cores <- 1 - fits <- future.apply::future_lapply(1:chains, + fits <- lapply_func(1:chains, fit_chain, stan_args = args, max_time = max_execution_time, - catch = TRUE, - future.seed = TRUE + catch = TRUE ) if (stuck_chains > 0) { fits[1:stuck_chains] <- NULL diff --git a/R/regional_epinow.R b/R/regional_epinow.R index cc0296e89..09d305031 100644 --- a/R/regional_epinow.R +++ b/R/regional_epinow.R @@ -16,7 +16,7 @@ #' #' Regions can be estimated in parallel using the `{future}` package (see #' [setup_future()]). The progress of producing estimates across multiple -#' regions is tracked using the `{progressr}` package. Modify this behaviour +#' regions can be tracked using the `{progressr}` package. Modify this behaviour #' using [progressr::handlers()] and enable it in batch by setting #' `R_PROGRESSR_ENABLE=TRUE` as an environment variable. #' @@ -54,13 +54,11 @@ #' @export #' @seealso [epinow()] [estimate_infections()] [setup_future()] #' [regional_summary()] -#' @importFrom future.apply future_lapply #' @importFrom data.table as.data.table setDT copy setorder #' @importFrom purrr safely map compact keep #' @importFrom futile.logger flog.info flog.warn flog.trace #' @importFrom R.utils withTimeout #' @importFrom rlang cnd_muffle -#' @importFrom progressr with_progress progressor #' @examples #' \donttest{ #' # set number of cores to use @@ -161,9 +159,8 @@ regional_epinow <- function(data, " function" ) - progressr::with_progress({ - progress_fn <- progressr::progressor(along = regions) - regional_out <- future.apply::future_lapply(regions, run_region, + run_regions <- function(progress_fn = NULL) { + lapply_func(regions, run_region, generation_time = generation_time, delays = delays, truncation = truncation, @@ -186,10 +183,19 @@ regional_epinow <- function(data, progress_fn = progress_fn, verbose = verbose, ..., - future.scheduling = Inf, - future.seed = TRUE + future.opts = list( + future.scheduling = Inf, + future.seed = TRUE + ) ) - }) + } + if (requireNamespace("progressr", quietly = TRUE)) { + progressr::with_progress({ + regional_out <- run_regions(progressr::progressor(along = regions)) + }) + } else { + regional_out <- run_regions() + } out <- process_regions(regional_out, regions) regional_out <- out$all @@ -313,7 +319,7 @@ clean_regions <- function(data, non_zero_points) { #' #' @param target_region Character string indicating the region being evaluated #' @param progress_fn Function as returned by [progressr::progressor()]. Allows -#' the use of a progress bar. +#' the use of a progress bar. If NULL (default), no progress bar is used. #' #' @param complete_logger Character string indicating the logger to output #' the completion of estimation to. @@ -341,7 +347,7 @@ run_region <- function(target_region, output, complete_logger, verbose, - progress_fn, + progress_fn = NULL, ...) { futile.logger::flog.info("Initialising estimates for: %s", target_region, name = "EpiNow2.epinow" @@ -390,7 +396,7 @@ run_region <- function(target_region, complete_logger ) - if (!missing(progress_fn)) { + if (!is.null(progress_fn)) { progress_fn(sprintf("Region: %s", target_region)) } return(out) diff --git a/R/setup.R b/R/setup.R index 2dfd8c96d..a7964a088 100644 --- a/R/setup.R +++ b/R/setup.R @@ -121,7 +121,8 @@ setup_default_logging <- function(logs = tempdir(check = TRUE), #' A utility function that aims to streamline the set up #' of the required future backend with sensible defaults for most users of #' [regional_epinow()]. More advanced users are recommended to setup their own -#' `{future}` backend based on their available resources. +#' `{future}` backend based on their available resources. Running this requires +#' the `{future}` package to be installed. #' #' @param strategies A vector length 1 to 2 of strategies to pass to #' [future::plan()]. Nesting of parallelisation is from the top level down. @@ -136,7 +137,6 @@ setup_default_logging <- function(logs = tempdir(check = TRUE), #' #' @inheritParams regional_epinow #' @importFrom futile.logger flog.error flog.info flog.debug -#' @importFrom future availableCores plan tweak #' @importFrom cli cli_abort #' @export #' @return Numeric number of cores to use per worker. If greater than 1 pass to @@ -145,6 +145,16 @@ setup_default_logging <- function(logs = tempdir(check = TRUE), setup_future <- function(data, strategies = c("multisession", "multisession"), min_cores_per_worker = 4) { + if (!requireNamespace("future", quietly = TRUE)) { + futile.logger::flog.error( + "The future package is required for parallelisation" + ) + cli_abort( + c( + "!" = "The future package is required for parallelisation." + ) + ) + } if (length(strategies) > 2 || length(strategies) == 0) { futile.logger::flog.error("1 or 2 strategies should be used") cli_abort( diff --git a/R/simulate_infections.R b/R/simulate_infections.R index f2b106b48..3170220e7 100644 --- a/R/simulate_infections.R +++ b/R/simulate_infections.R @@ -249,13 +249,11 @@ simulate_infections <- function(estimates, R, initial_infections, #' simulate. May decrease run times due to reduced IO costs but this is still #' being evaluated. If set to NULL then all simulations are done at once. #' -#' @param verbose Logical defaults to [interactive()]. Should a progress bar -#' (from `progressr`) be shown. +#' @param verbose Logical defaults to [interactive()]. If the `progressr` +#' package is available, a progress bar will be shown. #' @inheritParams stan_opts #' @importFrom rstan extract sampling #' @importFrom purrr list_transpose map safely compact -#' @importFrom future.apply future_lapply -#' @importFrom progressr with_progress progressor #' @importFrom data.table rbindlist as.data.table #' @importFrom lubridate days #' @importFrom checkmate assert_class assert_names test_numeric test_data_frame @@ -472,20 +470,10 @@ forecast_infections <- function(estimates, safe_batch <- safely(batch_simulate) - if (backend == "cmdstanr") { - lapply_func <- lapply ## future_lapply can't handle cmdstanr - } else { - lapply_func <- function(...) future_lapply(future.seed = TRUE, ...) - } - - ## simulate in batches - with_progress({ - if (verbose) { - p <- progressor(along = batches) - } - out <- lapply_func(batches, + process_batches <- function(p = NULL) { + lapply_func(batches, function(batch) { - if (verbose) { + if (!is.null(p)) { p() } safe_batch( @@ -493,18 +481,32 @@ forecast_infections <- function(estimates, shift, dates, batch[[1]], batch[[2]] )[[1]] - } + }, + future.opts = list( + future.seed = TRUE + ), + backend = backend ) - }) + } + + ## simulate in batches + if (verbose && requireNamespace("progressr", quietly = TRUE)) { + p <- progressr::progressor(along = batches) + progressr::with_progress({ + regional_out <- process_batches(p) + }) + } else { + regional_out <- process_batches() + } ## join batches - out <- compact(out) - out <- list_transpose(out, simplify = FALSE) - out <- map(out, rbindlist) + regional_out <- compact(regional_out) + regional_out <- list_transpose(regional_out, simplify = FALSE) + regional_out <- map(regional_out, rbindlist) ## format output format_out <- format_fit( - posterior_samples = out, + posterior_samples = regional_out, horizon = estimates$args$horizon, shift = shift, burn_in = 0, diff --git a/R/utilities.R b/R/utilities.R index a06b6e206..91867b062 100644 --- a/R/utilities.R +++ b/R/utilities.R @@ -402,6 +402,21 @@ set_dt_single_thread <- function() { ) } +#' Choose a parallel or sequential apply function +#' +#' Internal function that chooses an appropriate "apply"-type function (either +#' [lapply()] or [future.apply::future_lapply()]) +#' @return A function that can be used to apply a function to a list +#' @keywords internal +#' @inheritParams stan_opts +lapply_func <- function(..., backend = "rstan", future.opts = list()) { + if (requireNamespace("future.apply", quietly = TRUE) && backend == "rstan") { + do.call(future.apply::future_lapply, c(list(...), future.opts)) + } else { + lapply(...) + } +} + #' @importFrom stats glm median na.omit pexp pgamma plnorm quasipoisson rexp #' @importFrom stats rlnorm rnorm rpois runif sd var rgamma pnorm globalVariables( diff --git a/man/forecast_infections.Rd b/man/forecast_infections.Rd index 04e617011..e9c4fbde4 100644 --- a/man/forecast_infections.Rd +++ b/man/forecast_infections.Rd @@ -38,8 +38,8 @@ being evaluated. If set to NULL then all simulations are done at once.} \item{backend}{Character string indicating the backend to use for fitting stan models. Supported arguments are "rstan" (default) or "cmdstanr".} -\item{verbose}{Logical defaults to \code{\link[=interactive]{interactive()}}. Should a progress bar -(from \code{progressr}) be shown.} +\item{verbose}{Logical defaults to \code{\link[=interactive]{interactive()}}. If the \code{progressr} +package is available, a progress bar will be shown.} } \value{ A list of output as returned by \code{\link[=estimate_infections]{estimate_infections()}} but based on diff --git a/man/lapply_func.Rd b/man/lapply_func.Rd new file mode 100644 index 000000000..d7d5cc7d5 --- /dev/null +++ b/man/lapply_func.Rd @@ -0,0 +1,23 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/utilities.R +\name{lapply_func} +\alias{lapply_func} +\title{Choose a parallel or sequential apply function} +\usage{ +lapply_func(..., backend = "rstan", future.opts = list()) +} +\arguments{ +\item{...}{Additional parameters to pass to underlying option functions, +\code{\link[=stan_sampling_opts]{stan_sampling_opts()}} or \code{\link[=stan_vb_opts]{stan_vb_opts()}}, depending on the method} + +\item{backend}{Character string indicating the backend to use for fitting +stan models. Supported arguments are "rstan" (default) or "cmdstanr".} +} +\value{ +A function that can be used to apply a function to a list +} +\description{ +Internal function that chooses an appropriate "apply"-type function (either +\code{\link[=lapply]{lapply()}} or \code{\link[future.apply:future_lapply]{future.apply::future_lapply()}}) +} +\keyword{internal} diff --git a/man/regional_epinow.Rd b/man/regional_epinow.Rd index 67ab215c5..4a1c754f7 100644 --- a/man/regional_epinow.Rd +++ b/man/regional_epinow.Rd @@ -132,7 +132,7 @@ available to facilitate building this list. Regions can be estimated in parallel using the \code{{future}} package (see \code{\link[=setup_future]{setup_future()}}). The progress of producing estimates across multiple -regions is tracked using the \code{{progressr}} package. Modify this behaviour +regions can be tracked using the \code{{progressr}} package. Modify this behaviour using \code{\link[progressr:handlers]{progressr::handlers()}} and enable it in batch by setting \code{R_PROGRESSR_ENABLE=TRUE} as an environment variable. } diff --git a/man/run_region.Rd b/man/run_region.Rd index 078a301d6..7f7881f71 100644 --- a/man/run_region.Rd +++ b/man/run_region.Rd @@ -23,7 +23,7 @@ run_region( output, complete_logger, verbose, - progress_fn, + progress_fn = NULL, ... ) } @@ -98,7 +98,7 @@ the completion of estimation to.} to the console from \code{\link[=epinow]{epinow()}}.} \item{progress_fn}{Function as returned by \code{\link[progressr:progressor]{progressr::progressor()}}. Allows -the use of a progress bar.} +the use of a progress bar. If NULL (default), no progress bar is used.} \item{...}{Pass additional arguments to \code{\link[=epinow]{epinow()}}. See the documentation for \code{\link[=epinow]{epinow()}} for details.} diff --git a/man/setup_future.Rd b/man/setup_future.Rd index 3e63e3985..22de21ab3 100644 --- a/man/setup_future.Rd +++ b/man/setup_future.Rd @@ -35,5 +35,6 @@ Numeric number of cores to use per worker. If greater than 1 pass to A utility function that aims to streamline the set up of the required future backend with sensible defaults for most users of \code{\link[=regional_epinow]{regional_epinow()}}. More advanced users are recommended to setup their own -\code{{future}} backend based on their available resources. +\code{{future}} backend based on their available resources. Running this requires +the \code{{future}} package to be installed. }