diff --git a/.Rbuildignore b/.Rbuildignore index f4fb55f0..738e4ea6 100644 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -14,3 +14,4 @@ man-roxygen/* ^Meta$ ^README.Rmd$ ^revdep$ +^data-raw$ diff --git a/.github/workflows/benchmark.yaml b/.github/workflows/benchmark.yaml new file mode 100644 index 00000000..40c2381a --- /dev/null +++ b/.github/workflows/benchmark.yaml @@ -0,0 +1,225 @@ +on: + workflow_dispatch + + +name: "⏱️ Benchmark" +jobs: + benchmark: + runs-on: ubuntu-latest + + services: + postgres: + image: postgres:latest + env: + POSTGRES_DB: test + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + ports: + - 5432:5432 + options: --health-cmd "pg_isready -U postgres" --health-interval 10s --health-timeout 5s --health-retries 5 + + env: + PGHOST: localhost + PGPORT: 5432 + PGDATABASE: test + PGUSER: postgres + PGPASSWORD: postgres + + steps: + - name: Install a SQL Server suite of tools + uses: potatoqualitee/mssqlsuite@v1.7 + with: + install: sqlengine, sqlpackage, sqlclient + show-log: true + + - name: Configure SQL server + run: | + set -o xtrace + sqlcmd -V 10 -S localhost -U SA -P dbatools.I0 -Q "ALTER LOGIN SA WITH DEFAULT_DATABASE = master;" + + + + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + persist-credentials: false + + - name: Configure git + run: | + git config --local user.name "$GITHUB_ACTOR" + git config --local user.email "$GITHUB_ACTOR@users.noreply.github.com" + git switch ${GITHUB_HEAD_REF:-${GITHUB_REF#refs/heads/}} + + + + - uses: r-lib/actions/setup-r@v2 + with: + use-public-rspm: true + + - uses: r-lib/actions/setup-r-dependencies@v2 + with: + extra-packages: any::devtools + + + - name: Delete previous benchmark files + if: always() + run: rm -rf inst/extdata/benchmark-*.rds + + + + - name: Get SQLite version + run: | + version=$(Rscript -e "cat(DBI::dbGetQuery(DBI::dbConnect(RSQLite::SQLite()), 'SELECT sqlite_version();')[[1]])") + echo "SQLITE_VERSION=SQLite v$version" >> $GITHUB_ENV + + - name: Get DuckDB version + run: | + version=$(Rscript -e "cat(DBI::dbGetQuery(DBI::dbConnect(duckdb::duckdb()), 'SELECT version();')[[1]])") + echo "DUCKDB_VERSION=DuckDB $version" >> $GITHUB_ENV + + - name: Get PostgreSQL version + run: | + version=$(psql --version | awk '{print $3}') + echo "POSTGRES_VERSION=PostgreSQL v$version" >> $GITHUB_ENV + + - name: Get SQL Server version + run: | + version=$(sqlcmd -S localhost -U SA -P dbatools.I0 -Q "SET NOCOUNT ON; SELECT SERVERPROPERTY('productversion') AS version" -h -1 -W -b) + echo "SQL_SERVER_VERSION=SQL Server v$version" >> $GITHUB_ENV + + + - name: Install libraries to benchmark + if: always() + run: source("./data-raw/benchmark.R", echo=TRUE) + shell: Rscript {0} + + + + - name: Run benchmark (${{ env.SQLITE_VERSION }}) + if: always() + env: + BACKEND: ${{ env.SQLITE_VERSION }} + BACKEND_DRV: RSQLite::SQLite + BACKEND_ARGS: 'list(dbname = file.path(tempdir(), "SQLite.SQLite"))' + run: source("./data-raw/benchmark.R", echo=TRUE) + shell: Rscript {0} + + - name: Run benchmark (${{ env.DUCKDB_VERSION }}) + if: always() + env: + BACKEND: ${{ env.DUCKDB_VERSION }} + BACKEND_DRV: duckdb::duckdb + BACKEND_ARGS: 'list(dbdir = file.path(tempdir(), "DuckDB.duckdb"))' + run: source("./data-raw/benchmark.R", echo=TRUE) + shell: Rscript {0} + + - name: Run benchmark (${{ env.POSTGRES_VERSION }}) + if: always() + env: + BACKEND: ${{ env.POSTGRES_VERSION }} + BACKEND_DRV: RPostgres::Postgres + run: source("./data-raw/benchmark.R", echo=TRUE) + shell: Rscript {0} + + - name: Run benchmark (${{ env.SQL_SERVER_VERSION }}) + if: always() + env: + BACKEND: ${{ env.SQL_SERVER_VERSION }} + BACKEND_DRV: odbc::odbc + CONN_ARGS_JSON: > + { + "${{ env.SQL_SERVER_VERSION }}": { + "driver": "ODBC Driver 17 for SQL Server", + "server": "localhost", + "database": "master", + "UID": "SA", + "PWD": "dbatools.I0" + } + } + run: source("./data-raw/benchmark.R", echo=TRUE) + shell: Rscript {0} + + + + - name: Display structure of benchmark files + if: always() + run: ls -R data + + - name: Combine benchmark results + if: always() + run: | + benchmark_files <- list.files( + "data", + pattern = "^benchmark-", + full.names = TRUE, + recursive = TRUE + ) + + benchmarks <- benchmark_files |> + purrr::map(readRDS) |> + purrr::map(tibble::as_tibble) |> + purrr::reduce(rbind) + + benchmarks <- benchmarks |> + dplyr::mutate( + "version" = factor( + .data$version, + levels = c("CRAN", "main", setdiff(unique(benchmarks$version), c("CRAN", "main"))) + ) + ) + + # Save the combined benchmark results and delete the individual files + dir.create(file.path("inst", "extdata"), recursive = TRUE, showWarnings = FALSE) + saveRDS(benchmarks, file.path("inst", "extdata", "benchmarks.rds")) + file.remove(benchmark_files) + + + # Add note slow backends + slow_backends <- benchmarks |> + dplyr::distinct(.data$database, .data$n) |> + dplyr::filter(.data$n < max(.data$n)) |> + dplyr::pull("database") + + benchmarks <- benchmarks |> + dplyr::mutate("database" = paste0(database, ifelse(database %in% slow_backends, "*", ""))) + + + # Mean and standard deviation (see ggplot2::mean_se()) + mean_sd <- function(x) { + mu <- mean(x) + sd <- sd(x) + data.frame(y = mu, ymin = mu - sd, ymax = mu + sd) + } + + g <- ggplot2::ggplot( + benchmarks, + ggplot2::aes(x = version, y = time / 1e9) + ) + + ggplot2::stat_summary(fun.data = mean_sd, geom = "pointrange", size = 0.5, linewidth = 1) + + ggplot2::facet_grid(rows = ggplot2::vars(benchmark_function), cols = ggplot2::vars(database)) + + ggplot2::labs(x = "Codebase version", y = "Time (s)") + + if (length(slow_backends) > 1) { + g <- g + ggplot2::labs(caption = "* IMPORTANT: Benchmark data halved for this backend!") + } + + ggplot2::ggsave("benchmarks.pdf") + + shell: Rscript {0} + + - name: Upload benchmark summary + if: always() + uses: actions/upload-artifact@v4 + with: + name: benchmark-summary + path: benchmarks.pdf + + - name: Commit and push changes + run: | + git remote set-url origin https://$GITHUB_ACTOR:${{ secrets.GITHUB_TOKEN }}@github.com/$GITHUB_REPOSITORY.git + git stash --include-untracked + git pull --rebase origin ${GITHUB_HEAD_REF:-${GITHUB_REF#refs/heads/}} + git stash list | grep stash@{0} && git stash pop || echo "No stash to pop" + git add inst/extdata/\* + git commit -m "chore: Update benchmark data" || echo "No changes to commit" + git push origin ${GITHUB_HEAD_REF:-${GITHUB_REF#refs/heads/}} diff --git a/DESCRIPTION b/DESCRIPTION index 3915a93d..71f1ca05 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: SCDB Type: Package Title: Easily Access and Maintain Time-Based Versioned Data (Slowly-Changing-Dimension) -Version: 0.4.1 +Version: 0.4.1.9000 Authors@R: c(person("Rasmus Skytte", "Randl\U00F8v", , "rske@ssi.dk", role = c("aut", "cre", "rev"), @@ -21,6 +21,8 @@ License: GPL-3 Encoding: UTF-8 RoxygenNote: 7.3.2 Roxygen: list(markdown = TRUE, r6 = TRUE) +Depends: + R (>= 3.5.0) Imports: checkmate, DBI, @@ -42,10 +44,14 @@ Suggests: callr, conflicted, duckdb, + ggplot2, + here, jsonlite, knitr, lintr, + microbenchmark, odbc, + pak, rmarkdown, roxygen2, pkgdown, diff --git a/NAMESPACE b/NAMESPACE index 87d37c02..5940bd53 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,6 +1,9 @@ # Generated by roxygen2: do not edit by hand S3method(as.character,Id) +S3method(create_index,DBIConnection) +S3method(create_index,PqConnection) +S3method(create_index,SQLiteConnection) S3method(db_timestamp,"NULL") S3method(db_timestamp,SQLiteConnection) S3method(db_timestamp,default) @@ -54,6 +57,7 @@ S3method(tidyr::unite,tbl_dbi) export(Logger) export(LoggerNull) export(close_connection) +export(create_index) export(create_logs_if_missing) export(create_table) export(db_timestamp) diff --git a/NEWS.md b/NEWS.md index 1f083bf4..98cdb7aa 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,18 @@ +# SCDB (development version) + +## New features + +* Added function `create_index` to allow easy creating of an index on a table (#137). + +## Improvements and Fixes + +* `update_snapshot()` has been optimized and now runs faster on all the supported backends (#137). + +## Documentation + +* A vignette including benchmarks of `update_snapshot()` across various backends is added (#138). + + # SCDB 0.4.1 ## Improvements and Fixes @@ -14,6 +29,7 @@ * Improved tests for `get_tables()` (#145). + # SCDB 0.4.0 ## BREAKING CHANGES: diff --git a/R/connection.R b/R/connection.R index cb8abc90..bf329ed4 100644 --- a/R/connection.R +++ b/R/connection.R @@ -155,6 +155,14 @@ get_connection.OdbcDriver <- function( checkmate::assert_choice(timezone_out, OlsonNames(), null.ok = TRUE, add = coll) checkmate::reportAssertions(coll) + # Recommend batch processing for ODBC connections + if (is.null(getOption("odbc.batch_rows"))) { + message( + "Transfer of large data sets may be slow. ", + "Consider using options(\"odbc.batch_rows\" = 1000) to speed up transfer." + ) + } + # Check if connection can be established given these settings status <- do.call(DBI::dbCanConnect, args = args) if (!status) stop(attr(status, "reason")) diff --git a/R/create_index.R b/R/create_index.R new file mode 100644 index 00000000..674e163b --- /dev/null +++ b/R/create_index.R @@ -0,0 +1,76 @@ +#' Create the indexes on table +#' @param conn (`DBIConnection`)\cr +#' A connection to a database. +#' @template db_table +#' @param columns (`character()`)\cr +#' The columns that should be unique. +#' @return +#' NULL (called for side effects) +#' @examplesIf requireNamespace("RSQLite", quietly = TRUE) +#' conn <- get_connection() +#' +#' mt <- dplyr::copy_to(conn, dplyr::distinct(mtcars, .data$mpg, .data$cyl), name = "mtcars") +#' create_index(conn, mt, c("mpg", "cyl")) +#' +#' close_connection(conn) +#' @export +create_index <- function(conn, db_table, columns) { + checkmate::assert_class(conn, "DBIConnection") + assert_id_like(db_table) + checkmate::assert_character(columns) + checkmate::assert_true(table_exists(conn, db_table)) + + UseMethod("create_index") +} + +#' @export +create_index.PqConnection <- function(conn, db_table, columns) { + db_table <- id(db_table, conn) + + DBI::dbExecute( + conn, + glue::glue( + "CREATE UNIQUE INDEX ON {as.character(db_table, explicit = TRUE)} ({toString(columns)})" + ) + ) +} + +#' @export +create_index.SQLiteConnection <- function(conn, db_table, columns) { + db_table <- id(db_table, conn) + + schema <- purrr::pluck(db_table, "name", "schema") + table <- purrr::pluck(db_table, "name", "table") + + if (schema %in% c("main", "temp")) schema <- NULL + + # Generate index name + index <- paste( + c( + shQuote(schema), + shQuote(paste0(c(table, "scdb_index", columns), collapse = "_")) + ), + collapse = "." + ) + + DBI::dbExecute( + conn, + glue::glue( + "CREATE UNIQUE INDEX {index} ON {shQuote(table)} ({toString(columns)})" + ) + ) +} + +#' @export +create_index.DBIConnection <- function(conn, db_table, columns) { + db_table <- id(db_table, conn) + + index <- glue::glue("{db_table}_scdb_index_{paste(columns, collapse = '_')}") |> + stringr::str_replace_all(stringr::fixed("."), "_") + + query <- glue::glue( + "CREATE UNIQUE INDEX {index} ON {as.character(db_table, explicit = TRUE)} ({toString(columns)})" + ) + + DBI::dbExecute(conn, query) +} diff --git a/R/create_table.R b/R/create_table.R index 0273c703..8c0cb9f5 100644 --- a/R/create_table.R +++ b/R/create_table.R @@ -86,5 +86,7 @@ create_table <- function(.data, conn = NULL, db_table, ...) { ... ) + create_index(conn, db_table_id, columns = c("checksum", "from_ts")) + return(invisible(dplyr::tbl(conn, db_table_id))) } diff --git a/R/update_snapshot.R b/R/update_snapshot.R index 10646837..24a8671a 100644 --- a/R/update_snapshot.R +++ b/R/update_snapshot.R @@ -68,6 +68,8 @@ update_snapshot <- function(.data, conn, db_table, timestamp, filters = NULL, me checkmate::assert_multi_class(logger, "Logger", null.ok = TRUE) checkmate::assert_logical(enforce_chronological_order) + + ### Create target table if not exists # Retrieve Id from any valid db_table inputs to correctly create a missing table db_table_id <- id(db_table, conn) @@ -82,7 +84,7 @@ update_snapshot <- function(.data, conn, db_table, timestamp, filters = NULL, me db_table <- create_table(dplyr::collect(utils::head(.data, 0)), conn, db_table_id, temporary = FALSE) } - # Initialize logger + ### Initialize logger if (is.null(logger)) { logger <- Logger$new( db_table = db_table_id, @@ -101,7 +103,8 @@ update_snapshot <- function(.data, conn, db_table, timestamp, filters = NULL, me } - # Opening checks + + ### Check incoming data if (!is.historical(db_table)) { # Release table lock @@ -131,12 +134,15 @@ update_snapshot <- function(.data, conn, db_table, timestamp, filters = NULL, me logger$log_info("Parsing data for table", as.character(db_table_id), "started", tic = tic) # Use input time in log logger$log_info("Given timestamp for table is", timestamp, tic = tic) # Use input time in log - # Check for current update status + + + + ### Check for current update status db_latest <- db_table |> dplyr::summarize(max(.data$from_ts, na.rm = TRUE)) |> dplyr::pull() |> as.character() |> - max("1900-01-01 00:00:00", na.rm = TRUE) + dplyr::coalesce("1900-01-01 00:00:00") # Convert timestamp to character to prevent inconsistent R behavior with date/timestamps timestamp <- strftime(timestamp) @@ -152,7 +158,9 @@ update_snapshot <- function(.data, conn, db_table, timestamp, filters = NULL, me "timestamp in table:", db_latest, tic = tic) # Use input time in log } - # Compute .data immediately to reduce runtime and compute checksum + + + ### Filter and compute checksums for incoming data .data <- .data |> dplyr::ungroup() |> filter_keys(filters) |> @@ -168,14 +176,7 @@ update_snapshot <- function(.data, conn, db_table, timestamp, filters = NULL, me .data <- dplyr::compute(digest_to_checksum(.data, col = "checksum")) defer_db_cleanup(.data) - # Apply filter to current records - if (!is.null(filters) && !identical(dbplyr::remote_con(filters), conn)) { - filters <- dplyr::copy_to(conn, filters, name = unique_table_name()) - defer_db_cleanup(filters) - } - db_table <- filter_keys(db_table, filters) - - # Determine the next timestamp in the data (can be NA if none is found) + ### Determine the next timestamp in the data (can be NA if none is found) next_timestamp <- min(db_table |> dplyr::filter(.data$from_ts > timestamp) |> dplyr::summarize(next_timestamp = min(.data$from_ts, na.rm = TRUE)) |> @@ -186,124 +187,165 @@ update_snapshot <- function(.data, conn, db_table, timestamp, filters = NULL, me dplyr::pull("next_timestamp")) |> strftime() - # Consider only records valid at timestamp (and apply the filter if present) + + + ### Consider only records valid at timestamp (and apply the filter if present) db_table <- slice_time(db_table, timestamp) - # Count open rows at timestamp - nrow_open <- nrow(db_table) + # Apply filter to current records + if (!is.null(filters) && !identical(dbplyr::remote_con(filters), conn)) { + filters <- dplyr::copy_to(conn, filters, name = unique_table_name()) + defer_db_cleanup(filters) + } + db_table <- filter_keys(db_table, filters) + + + + # Determine records to remove and to add to the DB + # Records will be removed if their checksum no longer exists on the new date + # Records will be added if their checksum does not exists in the current data + + # Generate SQL at lower level than tidyverse to get the affected rows without computing. + slice_ts <- db_timestamp(timestamp, conn) - # Select only data with no until_ts and with different values in any fields + currently_valid_checksums <- db_table |> + dplyr::select("checksum") + + + ## Deactivation logger$log_info("Deactivating records") - if (nrow_open > 0) { - to_remove <- dplyr::setdiff(dplyr::select(db_table, "checksum"), - dplyr::select(.data, "checksum")) |> - dplyr::compute() # Something has changed in dbplyr (2.2.1) that makes this compute needed. - # Code that takes 20 secs with can be more than 30 minutes to compute without... - defer_db_cleanup(to_remove) - nrow_to_remove <- nrow(to_remove) - # Determine from_ts and checksum for the records we need to deactivate - to_remove <- to_remove |> - dplyr::left_join(dplyr::select(db_table, "from_ts", "checksum"), by = "checksum") |> - dplyr::mutate(until_ts = !!db_timestamp(timestamp, conn)) + checksums_to_deactivate <- dplyr::setdiff(currently_valid_checksums, dplyr::select(.data, "checksum")) - } else { - nrow_to_remove <- 0 - } + sql_deactivate <- dbplyr::sql_query_update_from( + con = conn, + table = dbplyr::as.sql(db_table_id, con = conn), + from = dbplyr::sql_render(checksums_to_deactivate), + by = "checksum", + update_values = c("until_ts" = slice_ts) + ) logger$log_info("After to_remove") + # Commit changes to DB + rs_deactivate <- DBI::dbSendQuery(conn, sql_deactivate) + n_deactivations <- DBI::dbGetRowsAffected(rs_deactivate) + DBI::dbClearResult(rs_deactivate) + logger$log_to_db(n_deactivations = !!n_deactivations) + logger$log_info("Deactivate records count:", n_deactivations) - to_add <- dplyr::setdiff(.data, dplyr::select(db_table, colnames(.data))) |> - dplyr::mutate(from_ts = !!db_timestamp(timestamp, conn), - until_ts = !!db_timestamp(next_timestamp, conn)) - nrow_to_add <- nrow(to_add) + ## Insertion + logger$log_info("Adding new records") + + records_to_insert <- dbplyr::build_sql( + con = conn, + "SELECT *, ", db_timestamp(timestamp, conn), " AS ", dbplyr::ident("from_ts"), ", ", + db_timestamp(next_timestamp, conn), " AS ", dbplyr::ident("until_ts"), + " FROM ", dbplyr::remote_table(.data), + " WHERE ", dbplyr::remote_table(.data), ".", dbplyr::ident("checksum"), + " NOT IN (", dbplyr::sql_render(currently_valid_checksums), ")" + ) + + sql_insert <- dbplyr::sql_query_insert( + con = conn, + table = dbplyr::as.sql(db_table_id, con = conn), + from = records_to_insert, + insert_cols = c(colnames(.data), "from_ts", "until_ts"), + by = c("checksum", "from_ts"), + conflict = "ignore" + ) logger$log_info("After to_add") + # Commit changes to DB + rs_insert <- DBI::dbSendQuery(conn, sql_insert) + n_insertions <- DBI::dbGetRowsAffected(rs_insert) + DBI::dbClearResult(rs_insert) + logger$log_to_db(n_insertions = !!n_insertions) + logger$log_info("Insert records count:", n_insertions) - if (nrow_to_remove > 0) { - dplyr::rows_update(x = dplyr::tbl(conn, db_table_id), - y = to_remove, - by = c("checksum", "from_ts"), - in_place = TRUE, - unmatched = "ignore") - } + # If chronological order is not enforced, some records may be split across several records + # checksum is the same, and from_ts / until_ts are continuous + # We collapse these records here + if (!enforce_chronological_order) { - logger$log_to_db(n_deactivations = nrow_to_remove) # Logs contains the aggregate number of added records on the day - logger$log_info("Deactivate records count:", nrow_to_remove) - logger$log_info("Adding new records") + # First we identify the records with this stitching + consecutive_rows <- dplyr::inner_join( + dplyr::tbl(conn, db_table_id), + dplyr::tbl(conn, db_table_id) |> dplyr::select("checksum", "from_ts", "until_ts"), + suffix = c("", ".p"), + sql_on = paste( + '"RHS"."checksum" = "LHS"."checksum" AND ', + '("LHS"."until_ts" = "RHS"."from_ts" OR "LHS"."from_ts" = "RHS"."until_ts")' + ) + ) - if (nrow_to_add > 0) { - dplyr::rows_append(x = dplyr::tbl(conn, db_table_id), y = to_add, in_place = TRUE) + # If the record has the earlier from_ts, we use the until_ts of the other. + # If the record has the later from_ts, we set until_ts equal to from_ts to trigger + # clean up later in update_snapshot. + consecutive_rows_fix <- consecutive_rows |> + dplyr::mutate("until_ts" = ifelse(.data$from_ts < .data$from_ts.p, .data$until_ts.p, .data$from_ts)) |> + dplyr::select(!tidyselect::ends_with(".p")) + + if (inherits(conn, "duckdb_connection")) { + # For duckdb the lower level translation fails + # dbplyr 2.5.0, duckdb 0.10.2 + consecutive_rows_fix <- dplyr::compute(consecutive_rows_fix) + defer_db_cleanup(consecutive_rows_fix) + n_consecutive <- nrow(consecutive_rows_fix) / 2 + + dplyr::rows_update( + x = dplyr::tbl(conn, db_table_id), + y = consecutive_rows_fix, + by = c("checksum", "from_ts"), + unmatched = "ignore", + in_place = TRUE + ) + + } else { + sql_fix_consecutive <- dbplyr::sql_query_upsert( + con = conn, + table = dbplyr::as.sql(db_table_id, con = conn), + from = dbplyr::sql_render(consecutive_rows_fix), + by = c("checksum", "from_ts"), + update_cols = "until_ts" + ) + + # Commit changes to DB + rs_fix_consecutive <- DBI::dbSendQuery(conn, sql_fix_consecutive) + n_consecutive <- DBI::dbGetRowsAffected(rs_fix_consecutive) / 2 + DBI::dbClearResult(rs_fix_consecutive) + } + logger$log_info("Doubly updated records removed:", n_consecutive) } - logger$log_to_db(n_insertions = nrow_to_add) - logger$log_info("Insert records count:", nrow_to_add) # If several updates come in a single day, some records may have from_ts = until_ts. + # Alternatively, the above handling of consecutive records will make records have from_ts = until_ts # We remove these records here redundant_rows <- dplyr::tbl(conn, db_table_id) |> dplyr::filter(.data$from_ts == .data$until_ts) |> dplyr::select("checksum", "from_ts") - nrow_redundant <- nrow(redundant_rows) - - if (nrow_redundant > 0) { - dplyr::rows_delete(dplyr::tbl(conn, db_table_id), - redundant_rows, - by = c("checksum", "from_ts"), - in_place = TRUE, unmatched = "ignore") - logger$log_info("Doubly updated records removed:", nrow_redundant) - } - - # If chronological order is not enforced, some records may be split across several records - # checksum is the same, and from_ts / until_ts are continuous - # We collapse these records here - if (!enforce_chronological_order) { - redundant_rows <- dplyr::tbl(conn, db_table_id) |> - filter_keys(filters) - redundant_rows <- dplyr::inner_join( - redundant_rows, - redundant_rows |> dplyr::select("checksum", "from_ts", "until_ts"), - suffix = c("", ".p"), - sql_on = '"RHS"."checksum" = "LHS"."checksum" AND "LHS"."until_ts" = "RHS"."from_ts"' - ) |> - dplyr::select(!"checksum.p") - - redundant_rows_to_delete <- redundant_rows |> - dplyr::transmute(.data$checksum, from_ts = .data$from_ts.p) |> - dplyr::compute() - defer_db_cleanup(redundant_rows_to_delete) - - redundant_rows_to_update <- redundant_rows |> - dplyr::transmute(.data$checksum, from_ts = .data$from_ts, until_ts = .data$until_ts.p) |> - dplyr::compute() - defer_db_cleanup(redundant_rows_to_update) - - if (nrow(redundant_rows_to_delete) > 0) { - dplyr::rows_delete(x = dplyr::tbl(conn, db_table_id), - y = redundant_rows_to_delete, - by = c("checksum", "from_ts"), - in_place = TRUE, - unmatched = "ignore") - } + sql_fix_redundant <- dbplyr::sql_query_delete( + con = conn, + table = dbplyr::as.sql(db_table_id, con = conn), + from = dbplyr::sql_render(redundant_rows), + by = c("checksum", "from_ts") + ) - if (nrow(redundant_rows_to_update) > 0) { - dplyr::rows_update(x = dplyr::tbl(conn, db_table_id), - y = redundant_rows_to_update, - by = c("checksum", "from_ts"), - in_place = TRUE, - unmatched = "ignore") - logger$log_info("Continous records collapsed:", nrow(redundant_rows_to_update)) - } + # Commit changes to DB + rs_fix_redundant <- DBI::dbSendQuery(conn, sql_fix_redundant) + n_redundant <- DBI::dbGetRowsAffected(rs_fix_redundant) / 2 + DBI::dbClearResult(rs_fix_redundant) + logger$log_info("Continuous records collapsed:", n_redundant) - } + # Clean up toc <- Sys.time() logger$finalize_db_entry() logger$log_info("Finished processing for table", as.character(db_table_id), tic = toc) diff --git a/_pkgdown.yml b/_pkgdown.yml index 0f6ba803..31512158 100644 --- a/_pkgdown.yml +++ b/_pkgdown.yml @@ -24,6 +24,7 @@ reference: - title: Create or manage tables contents: - create_table + - create_index - create_logs_if_missing - update_snapshot - filter_keys diff --git a/data-raw/benchmark.R b/data-raw/benchmark.R new file mode 100644 index 00000000..3e0f2a5a --- /dev/null +++ b/data-raw/benchmark.R @@ -0,0 +1,183 @@ +withr::local_options("odbc.batch_rows" = 1000) + +# Install extra dependencies +lib_paths_default <- .libPaths() +pak::pkg_install("jsonlite") +pak::pkg_install("microbenchmark") +pak::pkg_install("here") + +# Load the connection helper +source("tests/testthat/helper-setup.R") + +# Install all needed package versions +for (version in c("CRAN", "main", "branch")) { + branch <- system("git symbolic-ref --short HEAD", intern = TRUE) + sha <- system("git rev-parse HEAD", intern = TRUE) + if (version == "branch" && branch == "main") { + next + } + + source <- dplyr::case_when( + version == "CRAN" ~ "SCDB", + version == "main" ~ "ssi-dk/SCDB", + version == "branch" ~ glue::glue("ssi-dk/SCDB@{sha}") + ) + + lib_dir <- dplyr::case_when( + version == "CRAN" ~ "SCDB", + version == "main" ~ "ssi-dk-SCDB", + version == "branch" ~ glue::glue("ssi-dk-SCDB-{sha}") + ) + + lib_path <- here::here("installations", lib_dir) + dir.create(lib_path, showWarnings = FALSE) + + # Install the missing packages + .libPaths(c(lib_path, lib_paths_default)) + pak::lockfile_create(source, "SCDB.lock") + missing <- jsonlite::fromJSON("SCDB.lock")$packages$ref |> + purrr::discard(rlang::is_installed) + if (length(missing) > 0) pak::pkg_install(missing, lib = lib_path) +} + + +# Return early if no back-end is defined +if (identical(Sys.getenv("CI"), "true") && identical(Sys.getenv("BACKEND"), "")) { + message("No backend defined, skipping benchmark!") +} else { + + # Then loop over each and benchmark the update_snapshot function + for (version in c("CRAN", "main", "branch")) { + branch <- system("git symbolic-ref --short HEAD", intern = TRUE) + sha <- system("git rev-parse HEAD", intern = TRUE) + if (version == "branch" && branch == "main") { + next + } + + source <- dplyr::case_when( + version == "CRAN" ~ "SCDB", + version == "main" ~ "ssi-dk/SCDB", + version == "branch" ~ glue::glue("ssi-dk/SCDB@{sha}") + ) + + lib_dir <- dplyr::case_when( + version == "CRAN" ~ "SCDB", + version == "main" ~ "ssi-dk-SCDB", + version == "branch" ~ glue::glue("ssi-dk-SCDB-{sha}") + ) + + .libPaths(c(here::here("installations", lib_dir), lib_paths_default)) + library("SCDB") + + # Open connection to the database + conns <- get_test_conns() + conn <- conns[[1]] + + + # Our benchmark data is the iris data set but repeated to increase the data size + data_generator <- function(repeats) { + purrr::map( + seq(repeats), + \(it) dplyr::mutate(iris, r = dplyr::row_number() + (it - 1) * nrow(iris)) + ) |> + purrr::reduce(rbind) |> + dplyr::rename_with(~ tolower(gsub(".", "_", .x, fixed = TRUE))) + } + + # Benchmark 1, update_snapshot() with consecutive updates + try({ + n <- 10 + data_1 <- data_generator(n) + data_2 <- data_generator(2 * n) |> + dplyr::mutate( + "sepal_length" = dplyr::if_else( + .data$sepal_length > median(.data$sepal_length), + .data$sepal_length, + .data$sepal_length / 2 + ) + ) + data_3 <- data_generator(3 * n) |> + dplyr::mutate( + "sepal_length" = dplyr::if_else( + .data$sepal_length > median(.data$sepal_length), + .data$sepal_length, + .data$sepal_length / 2 + ), + "sepal_width" = dplyr::if_else( + .data$sepal_width > median(.data$sepal_width), + .data$sepal_width, + .data$sepal_width / 2 + ) + ) + + # Copy data to the conns + data_on_conn <- list( + dplyr::copy_to(conn, data_1, name = id("test.SCDB_data_1", conn), overwrite = TRUE, temporary = FALSE), + dplyr::copy_to(conn, data_2, name = id("test.SCDB_data_2", conn), overwrite = TRUE, temporary = FALSE), + dplyr::copy_to(conn, data_3, name = id("test.SCDB_data_3", conn), overwrite = TRUE, temporary = FALSE) + ) + + # Define the data to loop over for benchmark + ts <- list("2021-01-01", "2021-01-02", "2021-01-03") + + # Define the SCDB update functions + scdb_update_step <- function(conn, data, ts) { + update_snapshot(data, conn, "SCDB_benchmark_1", timestamp = ts, + logger = Logger$new(output_to_console = FALSE, warn = FALSE)) + } + + scdb_updates <- function(conn, data_on_conn) { + purrr::walk2(data_on_conn, ts, \(data, ts) scdb_update_step(conn, data, ts)) + DBI::dbRemoveTable(conn, name = "SCDB_benchmark_1") + } + + # Construct the list of benchmarks + update_snapshot_benchmark <- microbenchmark::microbenchmark(scdb_updates(conn, data_on_conn), times = 25) |> + dplyr::mutate( + "benchmark_function" = "update_snapshot()", + "database" = names(conns)[[1]], + "version" = !!ifelse(version == "branch", substr(sha, 1, 10), version), + "n" = n + ) + + dir.create("data", showWarnings = FALSE) + saveRDS(update_snapshot_benchmark, glue::glue("data/benchmark-update_snapshot_{names(conns)[[1]]}_{version}.rds")) + }) + + # Benchmark 2, update_snapshot() with increasing data size + try({ + for (n in floor(10^(seq(1, 3, length.out = 5)))) { + + data <- data_generator(n) |> + dplyr::copy_to(conn, df = _, name = id("test.SCDB_data", conn), overwrite = TRUE, temporary = FALSE) + + # Define the SCDB update function + scdb_updates <- function(conn, data) { + update_snapshot(data, conn, "SCDB_benchmark_2", timestamp = "2021-01-01", + logger = Logger$new(output_to_console = FALSE, warn = FALSE)) + DBI::dbRemoveTable(conn, name = "SCDB_benchmark_2") + } + + # Construct the list of benchmarks + update_snapshot_benchmark <- microbenchmark::microbenchmark(scdb_updates(conn, data), times = 5) |> + dplyr::mutate( + "benchmark_function" = "update_snapshot() - complexity", + "database" = names(conns)[[1]], + "version" = !!ifelse(version == "branch", substr(sha, 1, 10), version), + "n" = n + ) + + dir.create("data", showWarnings = FALSE) + saveRDS( + update_snapshot_benchmark, + glue::glue("data/benchmark-update_snapshot_complexity_{n}_{names(conns)[[1]]}_{version}.rds") + ) + } + + # Clean up + purrr::walk(conns, ~ DBI::dbDisconnect(., shutdown = TRUE)) + }) + + detach("package:SCDB", unload = TRUE) + } +} diff --git a/inst/WORDLIST b/inst/WORDLIST index ab331f9b..28b15669 100644 --- a/inst/WORDLIST +++ b/inst/WORDLIST @@ -71,6 +71,8 @@ sql SQLiteConnections SSI Statens +summarises +superlinear Sys tbl diff --git a/inst/extdata/benchmarks.rds b/inst/extdata/benchmarks.rds new file mode 100644 index 00000000..47076795 Binary files /dev/null and b/inst/extdata/benchmarks.rds differ diff --git a/man/create_index.Rd b/man/create_index.Rd new file mode 100644 index 00000000..aedbca22 --- /dev/null +++ b/man/create_index.Rd @@ -0,0 +1,34 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/create_index.R +\name{create_index} +\alias{create_index} +\title{Create the indexes on table} +\usage{ +create_index(conn, db_table, columns) +} +\arguments{ +\item{conn}{(\code{DBIConnection})\cr +A connection to a database.} + +\item{db_table}{(\verb{id-like object(1)})\cr +A table specification (coercible by \code{id()}).} + +\item{columns}{(\code{character()})\cr +The columns that should be unique.} +} +\value{ +NULL (called for side effects) +} +\description{ +Create the indexes on table +} +\examples{ +\dontshow{if (requireNamespace("RSQLite", quietly = TRUE)) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} + conn <- get_connection() + + mt <- dplyr::copy_to(conn, dplyr::distinct(mtcars, .data$mpg, .data$cyl), name = "mtcars") + create_index(conn, mt, c("mpg", "cyl")) + + close_connection(conn) +\dontshow{\}) # examplesIf} +} diff --git a/pak.lock b/pak.lock index 850abba4..96a02e9c 100644 --- a/pak.lock +++ b/pak.lock @@ -1494,6 +1494,37 @@ "needscompilation": false, "repotype": "cranlike" }, + { + "ref": "here", + "package": "here", + "version": "1.0.1", + "type": "standard", + "direct": false, + "binary": true, + "dependencies": "rprojroot", + "vignettes": false, + "metadata": { + "RemotePkgRef": "here", + "RemoteType": "standard", + "RemoteRef": "here", + "RemoteRepos": "https://packagemanager.posit.co/cran/__linux__/jammy/latest", + "RemotePkgPlatform": "x86_64-pc-linux-gnu-ubuntu-22.04", + "RemoteSha": "1.0.1" + }, + "sources": "https://packagemanager.posit.co/cran/__linux__/jammy/latest/src/contrib/here_1.0.1.tar.gz", + "target": "src/contrib/x86_64-pc-linux-gnu-ubuntu-22.04/4.4/here_1.0.1.tar.gz", + "platform": "x86_64-pc-linux-gnu-ubuntu-22.04", + "rversion": "4.4", + "directpkg": false, + "license": "MIT + file LICENSE", + "dep_types": ["Depends", "Imports", "LinkingTo"], + "params": [], + "install_args": "", + "sysreqs": "", + "sysreqs_packages": {}, + "needscompilation": false, + "repotype": "cranlike" + }, { "ref": "highr", "package": "highr", @@ -2143,7 +2174,7 @@ { "ref": "local::.", "package": "SCDB", - "version": "0.4.1", + "version": "0.4.1.9000", "type": "local", "direct": true, "binary": false, @@ -2154,7 +2185,7 @@ "RemoteType": "local" }, "sources": "file:///home/runner/work/SCDB/SCDB", - "target": "src/contrib/SCDB_0.4.1.tar.gz", + "target": "src/contrib/SCDB_0.4.1.9000.tar.gz", "platform": "source", "rversion": "*", "directpkg": true, @@ -2258,6 +2289,37 @@ "needscompilation": false, "repotype": "cranlike" }, + { + "ref": "microbenchmark", + "package": "microbenchmark", + "version": "1.5.0", + "type": "standard", + "direct": false, + "binary": true, + "dependencies": [], + "vignettes": false, + "metadata": { + "RemotePkgRef": "microbenchmark", + "RemoteType": "standard", + "RemoteRef": "microbenchmark", + "RemoteRepos": "https://packagemanager.posit.co/cran/__linux__/jammy/latest", + "RemotePkgPlatform": "x86_64-pc-linux-gnu-ubuntu-22.04", + "RemoteSha": "1.5.0" + }, + "sources": "https://packagemanager.posit.co/cran/__linux__/jammy/latest/src/contrib/microbenchmark_1.5.0.tar.gz", + "target": "src/contrib/x86_64-pc-linux-gnu-ubuntu-22.04/4.4/microbenchmark_1.5.0.tar.gz", + "platform": "x86_64-pc-linux-gnu-ubuntu-22.04", + "rversion": "4.4", + "directpkg": false, + "license": "BSD_2_clause + file LICENSE", + "dep_types": ["Depends", "Imports", "LinkingTo"], + "params": [], + "install_args": "", + "sysreqs": "On a Unix-alike, one of the C functions\n mach_absolute_time (macOS), clock_gettime or gethrtime. If none\n of these is found, the obsolescent POSIX function gettimeofday\n will be tried.", + "sysreqs_packages": {}, + "needscompilation": false, + "repotype": "cranlike" + }, { "ref": "mime", "package": "mime", @@ -2433,6 +2495,37 @@ "needscompilation": false, "repotype": "cranlike" }, + { + "ref": "pak", + "package": "pak", + "version": "0.8.0", + "type": "standard", + "direct": false, + "binary": true, + "dependencies": [], + "vignettes": false, + "metadata": { + "RemotePkgRef": "pak", + "RemoteType": "standard", + "RemoteRef": "pak", + "RemoteRepos": "https://packagemanager.posit.co/cran/__linux__/jammy/latest", + "RemotePkgPlatform": "x86_64-pc-linux-gnu-ubuntu-22.04", + "RemoteSha": "0.8.0" + }, + "sources": "https://packagemanager.posit.co/cran/__linux__/jammy/latest/src/contrib/pak_0.8.0.tar.gz", + "target": "src/contrib/x86_64-pc-linux-gnu-ubuntu-22.04/4.4/pak_0.8.0.tar.gz", + "platform": "x86_64-pc-linux-gnu-ubuntu-22.04", + "rversion": "4.4", + "directpkg": false, + "license": "GPL-3", + "dep_types": ["Depends", "Imports", "LinkingTo"], + "params": [], + "install_args": "", + "sysreqs": "", + "sysreqs_packages": {}, + "needscompilation": false, + "repotype": "cranlike" + }, { "ref": "parallelly", "package": "parallelly", diff --git a/tests/spelling.R b/tests/spelling.R index 33ef2c73..c2600f16 100644 --- a/tests/spelling.R +++ b/tests/spelling.R @@ -1,3 +1,3 @@ -if (requireNamespace("spelling", quietly = TRUE)) - spelling::spell_check_test(vignettes = TRUE, error = FALSE, - skip_on_cran = TRUE) +if (requireNamespace("spelling", quietly = TRUE)) { + spelling::spell_check_test(vignettes = TRUE, error = FALSE, skip_on_cran = TRUE) +} diff --git a/tests/testthat/test-Logger.R b/tests/testthat/test-Logger.R index 3c1b7ef1..bdb4553f 100644 --- a/tests/testthat/test-Logger.R +++ b/tests/testthat/test-Logger.R @@ -91,7 +91,7 @@ test_that("Logger: logging to file works", { # Test logging to file has the right formatting and message type - logger$log_info("test filewriting", tic = logger$start_time) + expect_no_message(logger$log_info("test filewriting", tic = logger$start_time)) tryCatch(logger$log_warn("test filewriting", tic = logger$start_time), warning = \(w) NULL) tryCatch(logger$log_error("test filewriting", tic = logger$start_time), error = \(e) NULL) @@ -132,7 +132,7 @@ test_that("Logger: logging to file works", { # Test logging to file still works - logger$log_info("test filewriting", tic = logger$start_time) + expect_no_message(logger$log_info("test filewriting", tic = logger$start_time)) ts_str <- format(logger$start_time, "%F %R:%OS3") expect_true(logger$log_filename %in% dir(log_path)) diff --git a/tests/testthat/test-update_snapshot.R b/tests/testthat/test-update_snapshot.R index 400aa548..2d8fb472 100644 --- a/tests/testthat/test-update_snapshot.R +++ b/tests/testthat/test-update_snapshot.R @@ -1,27 +1,25 @@ -test_that("update_snapshot() works", { +test_that("update_snapshot() can handle first snapshot", { for (conn in get_test_conns()) { if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn)) if (DBI::dbExistsTable(conn, id("test.SCDB_logs", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_logs", conn)) + expect_false(table_exists(conn, "test.SCDB_tmp1")) + expect_false(table_exists(conn, "test.SCDB_logs")) - target <- mtcars |> - dplyr::copy_to(conn, df = _, name = unique_table_name()) |> - digest_to_checksum(col = "checksum") |> - dplyr::mutate(from_ts = !!db_timestamp("2022-10-01 09:00:00", conn), - until_ts = !!db_timestamp(NA, conn)) - - # Copy target to conn - target <- dplyr::copy_to(conn, target, name = id("test.SCDB_tmp1", conn), overwrite = TRUE, temporary = FALSE) - + # Use unmodified mtcars as the initial snapshot .data <- mtcars |> - dplyr::mutate(hp = dplyr::if_else(hp > 130, hp - 10, hp)) |> dplyr::copy_to(conn, df = _, name = unique_table_name()) - # This is a simple update where 23 rows are replaced with 23 new ones on the given date + # Configure the logger for this update db_table <- "test.SCDB_tmp1" - timestamp <- "2022-10-03 09:00:00" + timestamp <- "2022-10-01 09:00:00" log_path <- tempdir() + # Ensure all logs are removed + dir(log_path) |> + purrr::keep(~ endsWith(., ".log")) |> + purrr::walk(~ unlink(file.path(log_path, .))) + logger <- Logger$new( db_table = db_table, timestamp = timestamp, @@ -31,31 +29,24 @@ test_that("update_snapshot() works", { output_to_console = FALSE ) - dir(log_path) |> - purrr::keep(~ endsWith(., ".log")) |> - purrr::walk(~ unlink(file.path(log_path, .))) - + # Update update_snapshot(.data, conn, db_table, timestamp, logger = logger) - expect_identical(slice_time(target, "2022-10-01 09:00:00") |> - dplyr::select(!c("from_ts", "until_ts", "checksum")) |> - dplyr::collect() |> - dplyr::arrange(wt, qsec), - mtcars |> dplyr::arrange(wt, qsec) |> tibble::tibble()) - expect_equal(nrow(slice_time(target, "2022-10-01 09:00:00")), - nrow(mtcars)) - - expect_identical(slice_time(target, "2022-10-03 09:00:00") |> - dplyr::select(!c("from_ts", "until_ts", "checksum")) |> - dplyr::collect() |> - dplyr::arrange(wt, qsec), - .data |> dplyr::collect() |> dplyr::arrange(wt, qsec)) - expect_equal(nrow(slice_time(target, "2022-10-03 09:00:00")), - nrow(mtcars)) + # Confirm snapshot is transferred correctly + expect_identical( + get_table(conn, db_table) |> + dplyr::collect() |> + dplyr::arrange(wt, qsec), + .data |> + dplyr::collect() |> + dplyr::arrange(wt, qsec) + ) + + ### For this test, we also check that the log output is correct ### # Check file log outputs exists log_pattern <- glue::glue("{stringr::str_replace_all(as.Date(timestamp), '-', '_')}.{id(db_table, conn)}.log") - log_file <- purrr::keep(dir(log_path), ~stringr::str_detect(., log_pattern)) + log_file <- purrr::keep(dir(log_path), ~ stringr::str_detect(., log_pattern)) expect_length(log_file, 1) expect_gt(file.info(file.path(log_path, log_file))$size, 0) expect_identical(nrow(get_table(conn, "test.SCDB_logs")), 1) @@ -85,37 +76,174 @@ test_that("update_snapshot() works", { if (inherits(conn, "SQLiteConnection")) { types <- types |> purrr::map_if(~ identical(., "POSIXct"), "character") |> # SQLite does not support POSIXct - purrr::map_if(~ identical(., "logical"), "numneric") |> # SQLite does not support logical + purrr::map_if(~ identical(., "logical"), "numeric") |> # SQLite does not support logical as.character() } checkmate::expect_data_frame(logs, nrows = 1, types) + # Check the content of the log table + expect_identical(as.character(logs$date), as.character(timestamp)) + + db_table_id <- id(db_table, conn) + if ("catalog" %in% colnames(logs)) expect_identical(logs$catalog, purrr::pluck(db_table_id, "name", "catalog")) + expect_identical(logs$schema, purrr::pluck(db_table_id, "name", "schema")) + expect_identical(logs$table, purrr::pluck(db_table_id, "name", "table")) + + expect_identical(logs$n_insertions, nrow(mtcars)) + expect_identical(logs$n_deactivations, 0L) + expect_true(as.logical(logs$success)) + expect_identical(logs$message, NA_character_) + + + # Clean up the logs + unlink(logger$log_realpath) + + close_connection(conn) + } +}) + +test_that("update_snapshot() can add a new snapshot", { + for (conn in get_test_conns()) { + + # Modify snapshot and run update step + .data <- mtcars |> + dplyr::mutate(hp = dplyr::if_else(hp > 130, hp - 10, hp)) |> + dplyr::copy_to(conn, df = _, name = unique_table_name()) + + # Configure the logger for this update + db_table <- "test.SCDB_tmp1" + timestamp <- "2022-10-03 09:00:00" + + logger <- Logger$new( + db_table = db_table, + timestamp = timestamp, + log_path = NULL, + log_table_id = "test.SCDB_logs", + log_conn = conn, + output_to_console = FALSE + ) + + + # Update + # This is a simple update where 15 rows are replaced with 15 new ones on the given date + update_snapshot(.data, conn, db_table, timestamp, logger = logger) + + # Check the snapshot has updated correctly + target <- dplyr::tbl(conn, id(db_table, conn)) + expect_identical( + slice_time(target, "2022-10-01 09:00:00") |> + dplyr::select(!c("from_ts", "until_ts", "checksum")) |> + dplyr::collect() |> + dplyr::arrange(wt, qsec), + mtcars |> + dplyr::arrange(wt, qsec) |> + tibble::as_tibble() + ) + expect_equal( + nrow(slice_time(target, "2022-10-01 09:00:00")), + nrow(mtcars) + ) + + expect_identical( + slice_time(target, "2022-10-03 09:00:00") |> + dplyr::select(!c("from_ts", "until_ts", "checksum")) |> + dplyr::collect() |> + dplyr::arrange(wt, qsec), + .data |> + dplyr::collect() |> + dplyr::arrange(wt, qsec) + ) + expect_equal( + nrow(slice_time(target, "2022-10-03 09:00:00")), + nrow(mtcars) + ) + + + # Check database log output + logs <- get_table(conn, "test.SCDB_logs") |> + dplyr::collect() |> + utils::tail(1) + + expect_identical(logs$n_insertions, 15L) + expect_identical(logs$n_deactivations, 15L) + expect_true(as.logical(logs$success)) + + close_connection(conn) + } +}) + +test_that("update_snapshot() can update a snapshot on an existing date", { + for (conn in get_test_conns()) { # We now attempt to do another update on the same date .data <- mtcars |> dplyr::mutate(hp = dplyr::if_else(hp > 100, hp - 10, hp)) |> dplyr::copy_to(conn, df = _, name = unique_table_name()) - update_snapshot(.data, conn, "test.SCDB_tmp1", "2022-10-03 09:00:00", logger = logger) + # Configure the logger for this update + db_table <- "test.SCDB_tmp1" + timestamp <- "2022-10-03 09:00:00" + + logger <- Logger$new( + db_table = db_table, + timestamp = timestamp, + log_path = NULL, + log_table_id = "test.SCDB_logs", + log_conn = conn, + output_to_console = FALSE + ) + + + # This is a more complicated update where a further 8 rows are replaced with 8 new ones on the same date as before + update_snapshot(.data, conn, db_table, "2022-10-03 09:00:00", logger = logger) # Even though we insert twice on the same date, we expect the data to be minimal (compacted) - expect_identical(slice_time(target, "2022-10-01 09:00:00") |> - dplyr::select(!c("from_ts", "until_ts", "checksum")) |> - dplyr::collect() |> - dplyr::arrange(wt, qsec), - mtcars |> dplyr::arrange(wt, qsec) |> tibble::tibble()) - expect_equal(nrow(slice_time(target, "2022-10-01 09:00:00")), - nrow(mtcars)) - - expect_identical(slice_time(target, "2022-10-03 09:00:00") |> - dplyr::select(!c("from_ts", "until_ts", "checksum")) |> - dplyr::collect() |> - dplyr::arrange(wt, qsec), - .data |> dplyr::collect() |> dplyr::arrange(wt, qsec)) - expect_equal(nrow(slice_time(target, "2022-10-03 09:00:00")), - nrow(mtcars)) + target <- dplyr::tbl(conn, id(db_table, conn)) + expect_identical( + slice_time(target, "2022-10-01 09:00:00") |> + dplyr::select(!c("from_ts", "until_ts", "checksum")) |> + dplyr::collect() |> + dplyr::arrange(wt, qsec), + mtcars |> + dplyr::arrange(wt, qsec) |> + tibble::tibble() + ) + expect_equal( + nrow(slice_time(target, "2022-10-01 09:00:00")), + nrow(mtcars) + ) + + expect_identical( + slice_time(target, "2022-10-03 09:00:00") |> + dplyr::select(!c("from_ts", "until_ts", "checksum")) |> + dplyr::collect() |> + dplyr::arrange(wt, qsec), + .data |> + dplyr::collect() |> + dplyr::arrange(wt, qsec) + ) + expect_equal( + nrow(slice_time(target, "2022-10-03 09:00:00")), + nrow(mtcars) + ) + + # Check database log output + logs <- get_table(conn, "test.SCDB_logs") |> + dplyr::collect() |> + utils::tail(1) + + expect_identical(logs$n_insertions, 8L) + expect_identical(logs$n_deactivations, 8L) + expect_true(as.logical(logs$success)) + + close_connection(conn) + } +}) + +test_that("update_snapshot() can insert a snapshot between existing dates", { + for (conn in get_test_conns()) { # We now attempt to an update between these two updates .data <- mtcars |> @@ -124,35 +252,58 @@ test_that("update_snapshot() works", { # This should fail if we do not specify "enforce_chronological_order = FALSE" expect_error( - update_snapshot(.data, conn, "test.SCDB_tmp1", "2022-10-02 09:00:00", logger = logger), + update_snapshot(.data, conn, "test.SCDB_tmp1", "2022-10-02 09:00:00", logger = LoggerNull$new()), regexp = "Given timestamp 2022-10-02 09:00:00 is earlier" ) # But not with the variable set update_snapshot(.data, conn, "test.SCDB_tmp1", "2022-10-02 09:00:00", - logger = logger, enforce_chronological_order = FALSE) + logger = LoggerNull$new(), enforce_chronological_order = FALSE) + + + target <- dplyr::tbl(conn, id("test.SCDB_tmp1", conn)) + expect_identical( + slice_time(target, "2022-10-01 09:00:00") |> + dplyr::select(!c("from_ts", "until_ts", "checksum")) |> + dplyr::collect() |> + dplyr::arrange(wt, qsec), + mtcars |> + dplyr::arrange(wt, qsec) |> + tibble::tibble() + ) + expect_equal( + nrow(slice_time(target, "2022-10-01 09:00:00")), + nrow(mtcars) + ) + + expect_identical( + slice_time(target, "2022-10-02 09:00:00") |> + dplyr::select(!c("from_ts", "until_ts", "checksum")) |> + dplyr::collect() |> + dplyr::arrange(wt, qsec), + .data |> + dplyr::collect() |> + dplyr::arrange(wt, qsec) + ) + expect_equal( + nrow(slice_time(target, "2022-10-02 09:00:00")), + nrow(mtcars) + ) - expect_identical(slice_time(target, "2022-10-01 09:00:00") |> - dplyr::select(!c("from_ts", "until_ts", "checksum")) |> - dplyr::collect() |> - dplyr::arrange(wt, qsec), - mtcars |> dplyr::arrange(wt, qsec) |> tibble::tibble()) - expect_equal(nrow(slice_time(target, "2022-10-01 09:00:00")), - nrow(mtcars)) + close_connection(conn) + } +}) - expect_identical(slice_time(target, "2022-10-02 09:00:00") |> - dplyr::select(!c("from_ts", "until_ts", "checksum")) |> - dplyr::collect() |> - dplyr::arrange(wt, qsec), - .data |> dplyr::collect() |> dplyr::arrange(wt, qsec)) - expect_equal(nrow(slice_time(target, "2022-10-02 09:00:00")), - nrow(mtcars)) +test_that("update_snapshot() works (holistic test 1)", { + for (conn in get_test_conns()) { + if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn)) expect_false(table_exists(conn, "test.SCDB_tmp1")) - # Check deletion of redundant rows + + # Create test data for the test t0 <- data.frame(col1 = c("A", "B"), col2 = c(NA_real_, NA_real_)) t1 <- data.frame(col1 = c("A", "B", "C"), col2 = c(1, NA_real_, NA_real_)) t2 <- data.frame(col1 = c("A", "B", "C"), col2 = c(1, 2, 3)) @@ -162,7 +313,7 @@ test_that("update_snapshot() works", { t1 <- dplyr::copy_to(conn, t1, name = id("test.SCDB_t1", conn), overwrite = TRUE, temporary = FALSE) t2 <- dplyr::copy_to(conn, t2, name = id("test.SCDB_t2", conn), overwrite = TRUE, temporary = FALSE) - + logger <- LoggerNull$new() update_snapshot(t0, conn, "test.SCDB_tmp1", "2022-01-01", logger = logger) expect_identical(dplyr::collect(t0) |> dplyr::arrange(col1), dplyr::collect(get_table(conn, "test.SCDB_tmp1")) |> dplyr::arrange(col1)) @@ -190,11 +341,30 @@ test_that("update_snapshot() works", { expect_identical(t, t_ref) + close_connection(conn) + } +}) + +test_that("update_snapshot() works (holistic test 2)", { + for (conn in get_test_conns()) { + if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn)) + expect_false(table_exists(conn, "test.SCDB_tmp1")) + + + # Create test data for the test + t0 <- data.frame(col1 = c("A", "B"), col2 = c(NA_real_, NA_real_)) + t1 <- data.frame(col1 = c("A", "B", "C"), col2 = c(1, NA_real_, NA_real_)) + t2 <- data.frame(col1 = c("A", "B", "C"), col2 = c(1, 2, 3)) + # Copy t0, t1, and t2 to conn (and suppress check_from message) + t0 <- dplyr::copy_to(conn, t0, name = id("test.SCDB_t0", conn), overwrite = TRUE, temporary = FALSE) + t1 <- dplyr::copy_to(conn, t1, name = id("test.SCDB_t1", conn), overwrite = TRUE, temporary = FALSE) + t2 <- dplyr::copy_to(conn, t2, name = id("test.SCDB_t2", conn), overwrite = TRUE, temporary = FALSE) # Check non-chronological insertion + logger <- LoggerNull$new() update_snapshot(t0, conn, "test.SCDB_tmp1", "2022-01-01", logger = logger) expect_identical(dplyr::collect(t0) |> dplyr::arrange(col1), dplyr::collect(get_table(conn, "test.SCDB_tmp1")) |> dplyr::arrange(col1)) @@ -213,25 +383,74 @@ test_that("update_snapshot() works", { from_ts = c("2022-01-01", "2022-01-01", "2022-02-01", "2022-02-01", "2022-03-01", "2022-03-01"), until_ts = c("2022-02-01", "2022-03-01", NA, "2022-03-01", NA, NA)) - expect_identical(get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) |> - dplyr::select(!"checksum") |> - dplyr::collect() |> - dplyr::mutate(from_ts = strftime(from_ts), - until_ts = strftime(until_ts)) |> - dplyr::arrange(col1, from_ts), - t_ref |> - dplyr::arrange(col1, from_ts)) + expect_identical( + get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) |> + dplyr::select(!"checksum") |> + dplyr::collect() |> + dplyr::mutate(from_ts = strftime(from_ts), + until_ts = strftime(until_ts)) |> + dplyr::arrange(col1, from_ts), + t_ref |> + dplyr::arrange(col1, from_ts) + ) - if (file.exists(logger$log_realpath)) file.remove(logger$log_realpath) + if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn)) + + connection_clean_up(conn) + } +}) + + +test_that("update_snapshot() handles 'NULL' updates", { + for (conn in get_test_conns()) { if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn)) + if (DBI::dbExistsTable(conn, id("test.SCDB_logs", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_logs", conn)) + + # Use mtcars as the test data set + .data <- mtcars |> + dplyr::copy_to(conn, df = _, name = unique_table_name()) + defer_db_cleanup(.data) + + # This is a simple update where 23 rows are replaced with 23 new ones on the given date + db_table <- "test.SCDB_tmp1" + + create_logger <- \(timestamp) { + Logger$new( + db_table = db_table, + timestamp = timestamp, + log_path = NULL, + log_table_id = "test.SCDB_logs", + log_conn = conn, + output_to_console = FALSE + ) + } + + # Update the table with update_snapshot() and store the results + update_snapshot(.data, conn, db_table, "2022-10-03 09:00:00", logger = create_logger("2022-10-03 09:00:00")) + target_data_1 <- get_table(conn, db_table, slice_ts = NULL) |> dplyr::collect() + + # Update the table with the same data again update_snapshot() and store the results + update_snapshot(.data, conn, db_table, "2022-10-04 09:00:00", logger = create_logger("2022-10-04 09:00:00")) + target_data_2 <- get_table(conn, db_table, slice_ts = NULL) |> dplyr::collect() + + # Check that the two updates are identical + expect_identical(target_data_1, target_data_2) + + # Confirm with logs that no updates have been made + logs <- get_table(conn, id("test.SCDB_logs", conn)) |> + dplyr::collect() |> + dplyr::arrange(date) + + expect_identical(logs$n_insertions, c(nrow(mtcars), 0L)) + expect_identical(logs$n_deactivations, c(0L, 0L)) connection_clean_up(conn) } }) -test_that("update_snapshot works with Id objects", { +test_that("update_snapshot() works with Id objects", { withr::local_options("SCDB.log_path" = NULL) # No file logging for (conn in get_test_conns()) { @@ -264,7 +483,7 @@ test_that("update_snapshot works with Id objects", { }) -test_that("update_snapshot checks table formats", { +test_that("update_snapshot() checks table formats", { withr::local_options("SCDB.log_path" = tempdir()) @@ -311,7 +530,7 @@ test_that("update_snapshot checks table formats", { }) -test_that("update_snapshot works with across connection", { +test_that("update_snapshot() works with across connection", { skip_if_not_installed("RSQLite") withr::local_options("SCDB.log_path" = NULL) # No file logging diff --git a/vignettes/benchmarks.Rmd b/vignettes/benchmarks.Rmd new file mode 100644 index 00000000..15e01fe5 --- /dev/null +++ b/vignettes/benchmarks.Rmd @@ -0,0 +1,157 @@ +--- +title: "SCDB: Benchmarks" +output: rmarkdown::html_vignette +bibliography: "references.bib" +vignette: > + %\VignetteIndexEntry{SCDB: Benchmarks} + %\VignetteEngine{knitr::rmarkdown} + %\VignetteEncoding{UTF-8} +--- + +```{r, include = FALSE} +knitr::opts_chunk$set( + collapse = TRUE, + comment = "#>" +) + +# NOTE: +# To re-run the benchmarks, run the "benchmark" workflow on GitHub +``` + +To showcase the performance of `SCDB` on different database backends, we include this vignette that summarises a simple +benchmark: +A sample dataset is created based on the `datasets::iris` dataset. This data is repeated 10 times and given a unique ID +(the row number of the data). + +This data forms the basis for three "snapshots" used in the benchmarks: + +1) The data as described above. + +2) As 1. but where any `Sepal.Length` below the median is halved. + +3) As 2. but where any `Sepal.width` below the median is halved. + +The benchmark function uses three consecutive calls to `update_snapshot()` to create the table with first snapshot and +then update it to the second and third snapshot. Finally, the table is deleted. + +The performance of this benchmark function is timed with the `microbenchmark` package using 10 replicates. +All benchmarks are run on the same machine. + +The results of the benchmark are shown graphically below (mean and standard deviation), where we compare the current +development version of `SCDB` with the current CRAN version. + +```{r benchmark_preprocessing, echo = FALSE, eval = requireNamespace("here")} +benchmark_location <- c( + system.file("extdata", "benchmarks.rds", package = "SCDB"), + here::here("inst", "extdata", "benchmarks.rds") +) |> + purrr::discard(~ identical(., "")) |> + purrr::pluck(1) + +benchmarks <- readRDS(benchmark_location) + +# Determine if the SHA is on main +sha <- benchmarks |> + dplyr::distinct(version) |> + dplyr::filter(!(version %in% c("CRAN", "main", "branch"))) |> + dplyr::pull("version") + +# Check local git history +on_main <- tryCatch({ + system(glue::glue("git branch main --contains {sha}"), intern = TRUE) |> + stringr::str_detect(stringr::fixed("main")) |> + isTRUE() +}, warning = function(w) { + # If on GitHub, git is not installed and we assume TRUE. + # This will render the vignette as it will look once merged onto main. + return(identical(Sys.getenv("CI"), "true")) +}) + +# If the SHA has been merged, use as the "main" version and remove the other, older, main version +if (on_main) { + benchmarks <- benchmarks |> + dplyr::filter(version != "main") |> + dplyr::mutate(version = dplyr::if_else(.data$version == "CRAN", "CRAN", "development")) +} + +# Mean and standard deviation (see ggplot2::mean_se()) +mean_sd <- function(x) { + mu <- mean(x) + sd <- sd(x) + data.frame(y = mu, ymin = mu - sd, ymax = mu + sd) +} +``` + +```{r benchmark_1, echo = FALSE, eval = requireNamespace("here")} +# Use data for benchmark 1 +benchmark_1 <- benchmarks |> + dplyr::filter(!stringr::str_ends(.data$benchmark_function, stringr::fixed("complexity"))) + +# Add note slow backends +slow_backends <- benchmark_1 |> + dplyr::distinct(.data$database, .data$n) |> + dplyr::filter(.data$n < max(.data$n)) |> + dplyr::pull("database") + +benchmark_1 <- benchmark_1 |> + dplyr::mutate("database" = paste0(database, ifelse(database %in% slow_backends, "*", ""))) + +# Insert newline into database name to improve rendering of figures +labeller <- ggplot2::as_labeller(\(l) stringr::str_replace_all(l, stringr::fixed(" v"), "\nv")) + + + +g <- ggplot2::ggplot( + benchmark_1, + ggplot2::aes(x = version, y = time / 1e9) +) + + ggplot2::stat_summary(fun.data = mean_sd, geom = "pointrange", size = 0.5, linewidth = 1) + + ggplot2::facet_grid( + rows = ggplot2::vars(benchmark_function), + cols = ggplot2::vars(database), + labeller = labeller + ) + + ggplot2::labs(x = "Codebase version", y = "Time (s)") + +if (length(slow_backends) > 1) { + g <- g + ggplot2::labs(caption = "* IMPORTANT: Benchmark data halved for this backend!") +} + +g +``` + + +We include another benchmark to highlight the complexity scaling of the `update_snapshot() ` with the size of the input +data. The datasets are similar to the first benchmark, but the number of repeats is varied to see the impact of +increasing data size. The benchmarks are run from a "clean" state, where the target_table does not exists. The benchmark +measures both the time to create the table and to remove it again afterwards (to restore the clean state). + +The performance of this benchmark function is timed with the `microbenchmark` package using 5 replicates. +All benchmarks are run on the same machine. + +The results of the benchmark are shown graphically below (mean and standard deviation) and with linear scaling (dotted +line), where we compare the current development version of `SCDB` with the current CRAN version. + +NOTE: There are reports of a superlinear complexity for very large data sets. If you experience such problems, consider +batching the updates via the `filters` argument. + +```{r benchmark_2, echo = FALSE, eval = requireNamespace("here")} +# Use data for benchmark 2 +benchmark_2 <- benchmarks |> + dplyr::filter(stringr::str_ends(.data$benchmark_function, stringr::fixed("complexity"))) |> + dplyr::mutate("benchmark_function" = stringr::str_remove_all(benchmark_function, stringr::fixed("- complexity"))) + +ggplot2::ggplot( + benchmark_2, + ggplot2::aes(x = n * nrow(iris) / 1e3, y = time / 1e9, color = version) +) + + ggplot2::stat_summary(fun.data = mean_sd, geom = "pointrange", size = 0.5, linewidth = 1) + + ggplot2::geom_smooth(method = "lm", formula = y ~ x, se = FALSE, linetype = 3) + + ggplot2::facet_grid( + rows = ggplot2::vars(benchmark_function), + cols = ggplot2::vars(database), + labeller = labeller + ) + + ggplot2::labs(x = "Data size (1,000 rows)", y = "Time (s)", color = "Codebase version") + + ggplot2::theme(panel.spacing = grid::unit(1, "lines"), legend.position = "bottom") +```