Skip to content

Commit

Permalink
Simplify worker logic
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Sep 26, 2023
1 parent 10644ea commit 42e0d8e
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 93 deletions.
2 changes: 1 addition & 1 deletion R/crew_controller.R
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ crew_class_controller <- R6::R6Class(
#' compatible with the analogous method of controller groups.
launch = function(n = 1L, controllers = NULL) {
self$launcher$tally()
walk(x = self$launcher$done(), f = self$launcher$rotate)
self$launcher$rotate()
walk(x = self$launcher$unlaunched(n = n), f = self$launcher$launch)
invisible()
},
Expand Down
96 changes: 58 additions & 38 deletions R/crew_launcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -425,41 +425,6 @@ crew_class_launcher <- R6::R6Class(
complete = .subset2(workers, "complete")
)
},
#' @description Get done workers.
#' @details A worker is "done" if it is launched and inactive.
#' A worker is "launched" if `launch()` was called
#' and the worker websocket has not been rotated since.
#' If a worker is currently online, then it is not inactive.
#' If a worker is not currently online, then it is inactive
#' if and only if (1) either it connected to the current
#' websocket at some point in the past,
#' or (2) `seconds_launch` seconds elapsed since launch.
#' @return Integer index of inactive workers.
done = function() {
bound <- self$seconds_launch
start <- self$workers$start
now <- nanonext::mclock() / 1000
launching <- !is.na(start) & ((now - start) < bound)
online <- self$workers$online
discovered <- self$workers$discovered
inactive <- (!online) & (discovered | (!launching))
launched <- self$workers$launched
which(inactive & launched)
},
#' @details Rotate a websocket.
#' @return `NULL` (invisibly).
#' @param index Integer index of a worker.
rotate = function(index) {
socket <- mirai::saisei(i = index, force = FALSE, .compute = self$name)
if (!is.null(socket)) {
handle <- self$workers$handle[[index]]
if (!is_crew_null(handle)) {
self$terminate_worker(handle)
}
self$workers$socket[index] <- socket
self$workers$launched[index] <- FALSE
}
},
#' @description Update the `daemons`-related columns of the internal
#' `workers` data frame.
#' @return `NULL` (invisibly).
Expand All @@ -473,12 +438,67 @@ crew_class_launcher <- R6::R6Class(
self$workers$complete <- as.integer(daemons[, "complete"])
invisible()
},
#' @description Get workers available for launch.
#' @description Get indexes of unlaunched workers.
#' @details A worker is "unlaunched" if it has never connected
#' to the current instance of its websocket. Once a worker
#' launches with the `launch()` method, it is considered "launched"
#' until it disconnects and its websocket is rotated with `rotate()`.
#' @return Integer index of workers available for launch.
#' @param n Maximum number of worker indexes to return.
unlaunched = function(n = Inf) {
head(x = which(!self$workers$launched), n = n)
},
#' @description Get workers that may still be booting up.
#' @details A worker is "booting" if its launch time is within the last
#' `seconds_launch` seconds. `seconds_launch` is a configurable grace
#' period when `crew` allows a worker to start up and connect to the
#' `mirai` dispatcher. The `booting()` function does not know about the
#' actual worker connection status, it just knows about launch times,
#' so it may return `TRUE` for workers that have already connected
#' and started doing tasks.
booting = function() {
bound <- self$seconds_launch
start <- self$workers$start
now <- nanonext::mclock() / 1000
launching <- !is.na(start) & ((now - start) < bound)
},
#' @description Get active workers.
#' @details A worker is "active" if its current instance is online and
#' connected, or if it is within its booting time window
#' and has never connected.
#' In other words, "active" means `online | (!discovered & booting)`.
#' @return Logical vector with `TRUE` for active workers and `FALSE` for
#' inactive ones.
active = function() {
booting <- self$booting()
online <- self$workers$online
discovered <- self$workers$discovered
online | (!discovered & booting)
},
#' @description Get done workers.
#' @details A worker is "done" if it is launched and inactive.
#' A worker is "launched" if `launch()` was called
#' and the worker websocket has not been rotated since.
#' @return Integer index of inactive workers.
done = function() {
!self$active() & self$workers$launched
},
#' @details Rotate websockets at all unlaunched workers.
#' @return `NULL` (invisibly).
rotate = function() {
which_done <- which(self$done())
for (index in which_done) {
socket <- mirai::saisei(i = index, force = FALSE, .compute = self$name)
if (!is.null(socket)) {
handle <- self$workers$handle[[index]]
if (!is_crew_null(handle)) {
self$terminate_worker(handle)
}
self$workers$socket[index] <- socket
self$workers$launched[index] <- FALSE
}
}
},
#' @description Launch a worker.
#' @return `NULL` (invisibly).
#' @param index Positive integer of length 1, index of the worker
Expand Down Expand Up @@ -512,7 +532,7 @@ crew_class_launcher <- R6::R6Class(
"launch_max above",
self$launch_max,
"or troubleshoot your platform to figure out",
"why {crew} workers are not launching or connecting."
"why {crew} workers are not booting up or connecting."
)
)
handle <- self$launch_worker(
Expand Down Expand Up @@ -559,7 +579,7 @@ crew_class_launcher <- R6::R6Class(
return(invisible())
}
self$tally()
walk(x = self$done(), f = self$rotate)
self$rotate()
unlaunched <- self$unlaunched(n = Inf)
active <- nrow(self$workers) - length(unlaunched)
deficit <- min(length(unlaunched), max(0L, demand - active))
Expand Down
1 change: 1 addition & 0 deletions inst/WORDLIST
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ ropensci
scm
SHA
SHA1
unlaunched
snapshotted
L'Ecuyer
traceback
Expand Down
136 changes: 87 additions & 49 deletions man/crew_class_launcher.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions man/crew_class_launcher_local.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion tests/testthat/test-crew_controller_local.R
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,9 @@ crew_test("crew_controller_local() can terminate a lost worker", {
x$launcher$workers$socket[1L] <- x$client$summary()$socket
x$launcher$workers$start[1L] <- - Inf
x$launcher$workers$launches[1L] <- 1L
x$launcher$workers$launched[1L] <- TRUE
expect_true(handle$is_alive())
x$launcher$rotate(index = 1L)
x$launcher$rotate()
crew_retry(
~!handle$is_alive(),
seconds_interval = 0.1,
Expand Down
6 changes: 3 additions & 3 deletions tests/testthat/test-crew_launcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,12 @@ crew_test("launcher done()", {
expect_equal(launcher$workers$assigned, rep(0L, nrow(grid)))
expect_equal(launcher$workers$complete, rep(0L, nrow(grid)))
launcher$tally(daemons = daemons)
out <- launcher$done()
out <- which(launcher$done())
exp <- c(1L, 2L, 3L, 4L, 7L, 8L, 9L, 10L, 11L, 12L)
expect_equal(out, exp)
launcher$workers$launched <- rep(FALSE, nrow(grid))
launcher$tally(daemons = daemons)
expect_equal(launcher$done(), integer(0L))
expect_equal(which(launcher$done()), integer(0L))
})

crew_test("launcher tally()", {
Expand Down Expand Up @@ -346,7 +346,7 @@ crew_test("custom launcher", {
seconds_timeout = 5
)
expect_false(handle$is_alive())
walk(x = controller$launcher$done(), f = controller$launcher$rotate)
controller$launcher$rotate()
controller$launcher$tally()
out <- controller$launcher$summary()
expect_equal(out$launches, 1L)
Expand Down
2 changes: 1 addition & 1 deletion tests/throughput/test-backlog-seconds_idle.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ controller <- crew_controller_local(
controller$start()
names <- character(0L)
index <- 0L
n_tasks <- 6000L
n_tasks <- 60000L
system.time(
while (index < n_tasks || !(controller$empty())) {
if (index < n_tasks) {
Expand Down

0 comments on commit 42e0d8e

Please sign in to comment.