Skip to content

Latest commit

 

History

History
1028 lines (865 loc) · 25.2 KB

_targets.md

File metadata and controls

1028 lines (865 loc) · 25.2 KB

Analysis pipeline: Evaluating Semi-Parametric Nowcasts of COVID-19 Hospital Admissions in Germany

Sam Abbott

Pipeline

The analysis pipeline for this work can be regenerated by rendering this file,

rmarkdown::render("_targets.Rmd")

The pipeline can then be run using,

tar_make()

The complete pipeline can be visualised using,

tar_visnetwork()

Alternatively the pipeline can be explored interactively using this notebook or updated programmatically using the scripts in bin. We also provide an archived version of our targets workflow if only wanting to reproduce sections of our analysis. This can be downloaded using the following,

source(here::here("R", "targets-archive.R"))
get_targets_archive()

Setup

Set up the workflow pipeline and options. We first load the targets package and remove the potentially outdated workflow.

library(targets)
library(tarchetypes)
library(data.table)
library(epinowcast)
library(ggplot2)
library(purrr, quietly = TRUE)
#> 
#> Attaching package: 'purrr'
#> The following object is masked from 'package:data.table':
#> 
#>     transpose
library(here)
#> here() starts at /eval-germany-sp-nowcasting
library(future)
library(future.callr)
tar_unscript()

We now define shared global options across our workflow and load R functions from the R folder.

library(targets)
library(tarchetypes)
library(cmdstanr)
library(data.table)
library(epinowcast)
library(ggplot2)
library(purrr, quietly = TRUE)
library(here)
library(future)
library(future.callr)
plan(callr)
functions <- list.files(here("R"), full.names = TRUE)
walk(functions, source)
rm("functions")
set_cmdstan_path()  
tar_option_set(
  packages = c("data.table", "epinowcast", "scoringutils", "ggplot2", "purrr",
               "cmdstanr", "here"),
  deployment = "worker",
  memory = "transient",
  workspace_on_error = TRUE,
  error = "continue",
  garbage_collection = TRUE
)
#> Establish _targets.R and _targets_r/globals/globals.R.

Ingest data and stratify by location and report date

  • Watch the URL at which hospitalisation data is hosted in order to trigger an update to the workflow when new data is uploaded
tar_url(
  hospitalisation_url,
  "https://raw.githubusercontent.com/KITmetricslab/hospitalization-nowcast-hub/main/data-truth/COVID-19/COVID-19_hospitalizations_preprocessed.csv" # nolint
)
#> Establish _targets.R and _targets_r/targets/hospitalisation_url.R.
  • Download and process hospitalisation data. Also download and combined data on public holidays in Germany.
tar_target(hospitalisations, {
  get_germany_hospitalisations(url = hospitalisation_url)
})
#> Define target hospitalisations from chunk code.
#> Establish _targets.R and _targets_r/targets/hospitalisations.R.
  • Define date to start nowcasting
tar_target(start_date, {
  as.Date("2021-10-01")
})
#> Define target start_date from chunk code.
#> Establish _targets.R and _targets_r/targets/start_date.R.
  • Define dates to nowcast.
tar_target(nowcast_dates, {
  unique(
    hospitalisations[reference_date >= start_date]$reference_date
  )
})
#> Define target nowcast_dates from chunk code.
#> Establish _targets.R and _targets_r/targets/nowcast_dates.R.
  • Define age groups
tar_target(age_groups, {
  unique(hospitalisations$age_group)
})
#> Define target age_groups from chunk code.
#> Establish _targets.R and _targets_r/targets/age_groups.R.
  • Define locations to nowcast using age group data (currently limited to just national level).
tar_target(locations, {
  "DE"
})
#> Define target locations from chunk code.
#> Establish _targets.R and _targets_r/targets/locations.R.
  • Define locations in which to nowcast only for the overall count and not for each age group.
tar_target(other_locations, {
  setdiff(unique(hospitalisations$location), locations)
})
#> Define target other_locations from chunk code.
#> Establish _targets.R and _targets_r/targets/other_locations.R.
  • Define maximum allowed reporting delay (in days).
tar_target(max_report_delay, {
  40
})
#> Define target max_report_delay from chunk code.
#> Establish _targets.R and _targets_r/targets/max_report_delay.R.
  • Define hospitalisations by date of report. Note that here the target is defined to never update if data exists for that nowcast date. This is because the truth date for previous days can change slightly over time as negative cases are adjusted for leading to spurious refits.
tar_target(
  hospitalisations_by_date_report,
  enw_retrospective_data(
    hospitalisations,
    rep_date = nowcast_dates,
    ref_days = max_report_delay
  )[, nowcast_date := nowcast_dates],
  map(nowcast_dates),
  iteration = "list",
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/hospitalisations_by_date_report.R.
  • Define latest available data.
tar_target(latest_hospitalisations, {
   enw_latest_data(hospitalisations)
})
#> Define target latest_hospitalisations from chunk code.
#> Establish _targets.R and _targets_r/targets/latest_hospitalisations.R.
  • Make latest observations into 7 day incidence.
tar_target(latest_7day_hospitalisations, {
  copy(latest_hospitalisations)[,
    confirm := frollsum(confirm, n =  7), by = c("age_group", "location")
  ][!is.na(confirm)]
})
#> Define target latest_7day_hospitalisations from chunk code.
#> Establish _targets.R and _targets_r/targets/latest_7day_hospitalisations.R.
  • Define “complete” data (i.e more than 28 days have passed since first reported)
tar_target(complete_hospitalisations, {
  latest_hospitalisations[reference_date < (max(reference_date) - 28)]
})
#> Define target complete_hospitalisations from chunk code.
#> Establish _targets.R and _targets_r/targets/complete_hospitalisations.R.
  • Similarly, define “complete” 7 day hospitalisation incidence.
tar_target(complete_7day_hospitalisations, {
  latest_7day_hospitalisations[reference_date < (max(reference_date) - 28)]
})
#> Define target complete_7day_hospitalisations from chunk code.
#> Establish _targets.R and _targets_r/targets/complete_7day_hospitalisations.R.
  • Plot reporting delay percentage by date, location, and age group.

Fit models and produce nowcasts

Model and model settings

  • Compile the model for multithread use.
tar_file(
  epinowcast_model,
  compile_model(),
  deployment = "main"
)
#> Establish _targets.R and _targets_r/targets/epinowcast_model.R.
  • Define stan settings shared across models and used for fitting
tar_target(epinowcast_settings, {
  list(
    save_warmup = FALSE,
    output_loglik = FALSE,
    pp = FALSE,
    iter_warmup = 1000,
    iter_sampling = 1000,
    chains = 2,
    parallel_chains = 2,
    threads_per_chain = 1,
    adapt_delta = 0.95,
    show_messages = FALSE,
    refresh = 0
  )
})
#> Define target epinowcast_settings from chunk code.
#> Establish _targets.R and _targets_r/targets/epinowcast_settings.R.

Priors

  • Define a set of uninformed priors using the defaults from epinowcast.
tar_target(uninformed_priors, {
  enw_priors()
})
#> Define target uninformed_priors from chunk code.
#> Establish _targets.R and _targets_r/targets/uninformed_priors.R.
  • Define a set of observations to use as a source for informed priors. Here we use overall national hospitalisations and data from the 1st of July 2021.
tar_target(
  prior_obs,
  enw_retrospective_data(
    hospitalisations[location == "DE"][age_group == "00+"],
    rep_date = as.Date("2021-07-01"), ref_days = max_report_delay
  ),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/prior_obs.R.
  • Fit a model to national overall hospitalisations only and extract posterior estimates for the delay distribution and overdispersion to use as informed priors for other models by assuming a normal distribution with their posterior standard deviations scaled by 10 times.
tar_target(priors, {
  do.call(prior_epinowcast, c(
    list(
      prior_obs, max_delay = max_report_delay, scale = 10,
      priors = uninformed_priors
    ),
    epinowcast_settings
  ))
})
#> Define target priors from chunk code.
#> Establish _targets.R and _targets_r/targets/priors.R.

Models

  • Intercept only model.
tar_target(
  fixed_nowcast,
  nowcast(
    obs = hospitalisations_by_date_report,
    tar_loc = locations,
    model = fixed_epinowcast,
    priors = priors,
    max_delay = max_report_delay,
    settings = epinowcast_settings
  ),
  cross(hospitalisations_by_date_report, locations),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/fixed_nowcast.R.
  • Intercept model with day of the week reporting effects.
tar_target(
  dow_nowcast,
  nowcast(
    obs = hospitalisations_by_date_report,
    tar_loc = locations,
    model = dow_epinowcast,
    priors = priors,
    max_delay = max_report_delay,
    settings = epinowcast_settings
  ),
  cross(hospitalisations_by_date_report, locations),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/dow_nowcast.R.
  • Age group random effect model with day of the week effect reporting effects.
tar_target(
  age_nowcast,
  nowcast(
    obs = hospitalisations_by_date_report,
    tar_loc = locations,
    model = age_epinowcast,
    priors = priors,
    max_delay = max_report_delay,
    settings = epinowcast_settings
  ),
  cross(hospitalisations_by_date_report, locations),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/age_nowcast.R.
  • Age group random effect model with a weekly random walk and a day of the week reporting effect.
tar_target(
  week_nowcast,
  nowcast(
    obs = hospitalisations_by_date_report,
    tar_loc = locations,
    model = week_epinowcast,
    priors = priors,
    max_delay = max_report_delay,
    settings = epinowcast_settings
  ),
  cross(hospitalisations_by_date_report, locations),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/week_nowcast.R.
  • Age group random effect model with an age group specific weekly random walk, and a day of the week reporting effect.
tar_target(
  age_week_nowcast,
  nowcast(
    obs = hospitalisations_by_date_report,
    tar_loc = locations,
    model = age_week_epinowcast,
    priors = priors,
    max_delay = max_report_delay,
    settings = epinowcast_settings
  ),
  cross(hospitalisations_by_date_report, locations),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/age_week_nowcast.R.
  • Independent age group model with a weekly random walk and a day of the week reporting model.
tar_target(
  independent_nowcast,
  nowcast(
    obs = hospitalisations_by_date_report[age_group == age_groups],
    tar_loc = locations,
    model = independent_epinowcast,
    priors = priors,
    max_delay = max_report_delay,
    settings = epinowcast_settings
  ),
  cross(hospitalisations_by_date_report, locations, age_groups),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/independent_nowcast.R.
  • Independent age group model with a weekly random walk and a day of the week reference and reporting model.
tar_target(
  independent_ref_dow_nowcast,
  nowcast(
    obs = hospitalisations_by_date_report[age_group == age_groups],
    tar_loc = locations,
    model = independent_ref_dow_epinowcast,
    priors = priors,
    max_delay = max_report_delay,
    settings = epinowcast_settings
  ),
  cross(hospitalisations_by_date_report, locations, age_groups),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/independent_ref_dow_nowcast.R.
  • Model for locations without age groups. As for the previous model this has a weekly random walk and a day of the week reporting and reference model. This was updated on the 6th of December 2021 from the independent model without a day of the week effect for reference date (to enable this note that we have overridden the default cue settings here so that once a model has been fit it is never run again for that combination of locations and dates regardless of upstream changes).
tar_target(
  overall_only_nowcast,
  nowcast(
    obs = hospitalisations_by_date_report[age_group == "00+"],
    tar_loc = other_locations,
    model = independent_ref_dow_epinowcast,
    priors = priors,
    max_delay = max_report_delay,
    settings = epinowcast_settings
  ),
  cross(hospitalisations_by_date_report, other_locations),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/overall_only_nowcast.R.

Postprocess

  • Combine nowcasts from each model
tar_target(combined_nowcasts, {
  rbindlist(list(
    fixed_nowcast,
    dow_nowcast,
    age_nowcast,
    week_nowcast,
    age_week_nowcast,
    independent_nowcast,
    overall_only_nowcast,
    independent_ref_dow_nowcast
  ), use.names = TRUE, fill = TRUE)[,
     model := factor(
      model,
      levels = c("Reference: Fixed, Report: Fixed",
                 "Reference: Fixed, Report: Day of week",
                 "Reference: Age, Report: Day of week",
                 "Reference: Age and week, Report: Day of week",
                 "Reference: Age and week by age, Report: Day of week",
                 "Independent by age, Reference: Week, Report: Day of week",
                 "Independent by age, Reference: Week and day of week, Report: Day of week")
     )
    ][, id := 1:.N]
})
#> Define target combined_nowcasts from chunk code.
#> Establish _targets.R and _targets_r/targets/combined_nowcasts.R.
  • Extract summarised daily nowcast. As a temporary measure here we adjust quantiles that are more than 25% of the median when there is evidence of a fitting issue (based on divergent transistions and R hat values).
tar_target(summarised_nowcast, {
  combined_nowcasts |> 
    adjust_posteriors(
      target = "daily", 
      max_ratio = 0.25, 
      rhat_bound = 1.1,
      per_dt_bound = 0.2
  ) |> 
    unnest_nowcasts(target = "daily") 
})
#> Define target summarised_nowcast from chunk code.
#> Establish _targets.R and _targets_r/targets/summarised_nowcast.R.
  • Extract summarised 7 day nowcast. As a temporary measure here we adjust quantiles that are more than 25% of the median when there is evidence of a fitting issue (based on divergent transistions and R hat values).
tar_target(summarised_7day_nowcast, {
  combined_nowcasts |> 
    adjust_posteriors(
      target = "seven_day", 
      max_ratio = 0.25, 
      rhat_bound = 1.1,
      per_dt_bound = 0.2
  ) |> 
    unnest_nowcasts(target = "seven_day") 
})
#> Define target summarised_7day_nowcast from chunk code.
#> Establish _targets.R and _targets_r/targets/summarised_7day_nowcast.R.
  • Save daily nowcasts stratified by nowcasting date.
tar_file(
  save_daily_nowcasts,
  summarised_nowcast[nowcast_date == nowcast_dates] |>
    save_csv(
      filename = paste0(nowcast_dates, ".csv"),
      path = here("data/nowcasts/daily"),
      allow_empty = FALSE
    ),
  map(nowcast_dates),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/save_daily_nowcasts.R.
  • Save 7 day nowcasts stratified by nowcasting date
tar_file(
  save_7day_nowcasts,
  summarised_7day_nowcast[nowcast_date == nowcast_dates] |>
    save_csv(
      filename = paste0(nowcast_dates, ".csv"),
      path = here("data/nowcasts/seven_day"),
      allow_empty = FALSE
    ),
  map(nowcast_dates),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/save_7day_nowcasts.R.
  • Define and format the hierarchical nowcast for submission. As a temporary measure here we adjust quantiles that are more than 25% of the median when there is evidence of a fitting issue (based on divergent transistions and R hat values).
tar_target(
  hierarchical_submission_nowcast,
  age_week_nowcast[nowcast_date == nowcast_dates] |>
    adjust_posteriors(
      target = "seven_day", 
      max_ratio = 0.25, 
      rhat_bound = 1.1,
      per_dt_bound = 0.2
    ) |>
    select_var("seven_day") |>
    rbindlist() |>
    format_for_submission(),
  map(nowcast_dates),
  iteration = "list",
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/hierarchical_submission_nowcast.R.
  • Format and save 7 day hierarchical nowcasts for hub submission.
tar_file(
  save_hierarchical_submission,
  save_csv(
    hierarchical_submission_nowcast,
      filename = paste0(nowcast_dates, ".csv"),
      path = here("data/nowcasts/submission/hierarchical")
  ),
  map(hierarchical_submission_nowcast, nowcast_dates)
)
#> Establish _targets.R and _targets_r/targets/save_hierarchical_submission.R.
  • Define and format the independent nowcast for submission. As a temporary measure here we adjust quantiles that are more than 25% of the median when there is evidence of a fitting issue (based on divergent transistions and R hat values). On the 6th of December the submission model was updated to include a reference day of the week effect.
tar_target(
  independent_submission_nowcast,
  rbind(
    independent_ref_dow_nowcast[nowcast_date == nowcast_dates],
    overall_only_nowcast[nowcast_date == nowcast_dates]
  ) |> 
   adjust_posteriors(
      target = "seven_day", 
      max_ratio = 0.25, 
      rhat_bound = 1.1,
      per_dt_bound = 0.2
    ) |>
    select_var("seven_day") |>
    rbindlist() |>
    format_for_submission(),
  map(nowcast_dates),
  iteration = "list",
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/independent_submission_nowcast.R.
  • Format and save 7 day independent nowcasts for hub submission.
tar_file(
  save_independent_submission,
  save_csv(
    independent_submission_nowcast,
    filename = paste0(nowcast_dates, ".csv"),
    path = here("data/nowcasts/submission/independent")
  ),
  map(independent_submission_nowcast, nowcast_dates)
)
#> Establish _targets.R and _targets_r/targets/save_independent_submission.R.
  • Save latest hospitalisation data
tar_file(
  save_latest_daily_hospitalisations,
  save_csv(
    latest_hospitalisations,
    filename = paste0("daily.csv"),
    path = here("data/observations")
  )
)
#> Establish _targets.R and _targets_r/targets/save_latest_daily_hospitalisations.R.
  • Save 7 day hospitalisation data
tar_file(
  save_latest_7day_hospitalisations,
  save_csv(
    latest_7day_hospitalisations,
    filename = paste0("seven_day.csv"),
    path = here("data/observations")
  )
)
#> Establish _targets.R and _targets_r/targets/save_latest_7day_hospitalisations.R.

Evaluation

  • Extract and save model fitting diagnostics.
tar_target(diagnostics, {
  combined_nowcasts[, c("daily", "seven_day") := NULL]
})
#> Define target diagnostics from chunk code.
#> Establish _targets.R and _targets_r/targets/diagnostics.R.
list(
  tar_file(
    save_all_diagnostics,
      save_csv(
        diagnostics,
        filename = "all.csv",
        path = here("data/diagnostics")
      )
  ),
  tar_file(
    save_high_rhat_diagnostics,
      save_csv(
        diagnostics[max_rhat > 1.05],
        filename = "high-rhat.csv",
        path = here("data/diagnostics")
      )
  ),
  tar_file(
    save_high_divergent_transitions,
      save_csv(
        diagnostics[per_divergent_transitions > 0.1],
        filename = "high-divergent-transitions.csv",
        path = here("data/diagnostics")
      )
  ),
  tar_file(
    save_failures,
      save_csv(
        diagnostics[failure == TRUE],
        filename = "fitting-failed.csv",
        path = here("data/diagnostics")
      )
  )
)
#> Establish _targets.R and _targets_r/targets/save_diagnostics.R.
  • Extract and save model run-time at the national level aggregated across age-group.
tar_file(
  save_run_time,
  combined_nowcasts |>
    summarise_runtimes() |>
    save_csv(
      filename = "run-times.csv",
      path = here("data/diagnostics")
    )
)
#> Establish _targets.R and _targets_r/targets/save-run-time.R.
  • Filter nowcasts to only include those with “complete” (more than 28 days of reports) data for all age groupsand with horizons between 0 days and -7 days from the nowcast date.
tar_target(scored_nowcasts, {
  summarised_nowcast[location == locations][
    reference_date < (max(nowcast_date) - 28)][,
    holiday := NULL][,
    horizon := as.numeric(as.Date(reference_date) - nowcast_date)][
    horizon >= -7
  ]
})
#> Define target scored_nowcasts from chunk code.
#> Establish _targets.R and _targets_r/targets/scored_nowcasts.R.
  • Score daily nowcasts overall, by age group, by horizon, by nowcast date, and by reference date on both the natural and log scales (corresponding to absolute and relative scoring). These summarised scores are then saved to data/scores.
tar_map(
  list(score_by = list(
    "overall", "age_group", "horizon", "reference_date", "nowcast_date"
  )),
  tar_target(
    scores,
    enw_score_nowcast(
      scored_nowcasts, complete_hospitalisations, 
      summarise_by = drop_string(c(score_by, "model"), "overall"),
      log = FALSE
    )
  ),
  tar_target(
    log_scores,
    enw_score_nowcast(
      scored_nowcasts, complete_hospitalisations, 
      summarise_by = drop_string(c(score_by, "model"), "overall"),
      log = TRUE
    )
  ),
  tar_file(
    save_scores,
    save_csv(
      rbind(scores[, scale := "natural"], log_scores[, scale := "log"]),
      filename = paste0(paste(score_by, sep = "-"), ".csv"),
      path = here("data/scores")
    )
  )
)
#> Establish _targets.R and _targets_r/targets/score.R.

Visualise

Currently these are not produced. See the real time report instead

  • Plot most recent daily nowcast by location, age group, and model.
tar_target(
  plot_latest_nowcast,
  enw_plot_nowcast_quantiles(
    summarised_nowcast[nowcast_date == max(nowcast_date)][
                       location == locations][
                       reference_date >= (nowcast_date - 28)]
  ) +
  facet_grid(vars(age_group), vars(model), scales = "free_y"),
  map(locations),
  iteration = "list"
)
  • Plot most recent seven day nowcast by location, age group, and model.
tar_target(
  plot_latest_7day_nowcast,
  enw_plot_nowcast_quantiles(
    summarised_7day_nowcast[nowcast_date == max(nowcast_date)][
                            location == locations][
                            reference_date >= (nowcast_date - 28)]
  ) +
  facet_grid(vars(age_group), vars(model), scales = "free_y"),
  map(locations),
  iteration = "list"
)
  • Plot daily nowcasts at each horizon from the date of the nowcast to 7 days previously by location, age group and model.
tar_map(
  list(horizons = 0:7),
  tar_target(
    plot_nowcast_horizon,
    enw_plot_nowcast_quantiles(
      scored_nowcasts[location == locations][horizon == -horizons][,
                      confirm := NA],
      latest_obs = latest_hospitalisations[
        location == locations][
        reference_date >= min(scored_nowcasts$reference_date)][
        reference_date <= max(scored_nowcasts$reference_date)
      ],
      log = TRUE
    ) +
    facet_grid(vars(age_group), vars(model), scales = "free_y"),
    map(locations),
    iteration = "list"
  )
)
  • Plot 7 day nowcasts at each horizon from the date of the nowcast to 7 days previously by location, age group and model.
tar_map(
  list(horizons = 0:7),
  tar_target(
    plot_7day_nowcast_horizon,
    enw_plot_nowcast_quantiles(
      summarised_7day_nowcast[
        reference_date < (max(nowcast_date) - 28)][,
        holiday := NULL][,
        horizon := as.numeric(as.Date(reference_date) - nowcast_date)][
        location == locations][
        horizon == -horizons][,
        confirm := NA],
      latest_obs = latest_7day_hospitalisations[
        location == locations][
        reference_date >= min(scored_nowcasts$reference_date)][
        reference_date <= max(scored_nowcasts$reference_date)
      ],
      log = TRUE
    ) +
    facet_grid(vars(age_group), vars(model), scales = "free_y"),
    map(locations),
    iteration = "list"
  )
)
  • Plot daily nowcasts for locations without age groups stratified nowcasts.

  • Plot 7 day nowcasts for locations without age group stratified nowcasts.

  • Plot scores relative to the baseline by location and age group on both the natural and log scale.