Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Background execution and R process pool #311

Merged
merged 79 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
d577bc4
Prototype execution in background thread
Sicheng-Pan Jun 25, 2023
9918a64
Refactor rthreadhandle prototype
Sicheng-Pan Jun 26, 2023
cb41db2
Prototype background R process
Sicheng-Pan Jul 2, 2023
eae0ebc
Made a functional R subprocess!
Sicheng-Pan Jul 2, 2023
9b1ad25
Refactor code for background handler
Sicheng-Pan Jul 3, 2023
18f98c1
Implement R process pool
Sicheng-Pan Jul 4, 2023
f907a22
Merge branch 'main' into lazy_in_background
Sicheng-Pan Jul 4, 2023
fdee6b5
Attempt to (de)serialize series from polars
Sicheng-Pan Jul 4, 2023
4f5886b
Rename R background module file
Sicheng-Pan Jul 4, 2023
fd3a9fb
Implement expr and lf
Sicheng-Pan Jul 5, 2023
4c8e06c
Merge branch 'main' into lazy_in_background
Sicheng-Pan Jul 5, 2023
41f180a
Implement R wrappers for thread handle and global R pool config
Sicheng-Pan Jul 5, 2023
020f902
Use private function to handle request in subprocess
Sicheng-Pan Jul 5, 2023
513fee6
Fix handle display
Sicheng-Pan Jul 6, 2023
717ecd7
Fix error handling in background process
Sicheng-Pan Jul 6, 2023
44799fc
Fix more error handling issues
Sicheng-Pan Jul 6, 2023
5b9ffed
Use shared memory for IPC communication
Sicheng-Pan Jul 11, 2023
6452598
make build export NOT_CRAN=true
sorhawell Jul 11, 2023
c60e596
functions to load and check polars in an R session
sorhawell Jul 11, 2023
49181f7
now it work
sorhawell Jul 11, 2023
ecd7199
update tip to readme
sorhawell Jul 11, 2023
0100441
add test_serde_df add benchmark script
sorhawell Jul 11, 2023
c68a443
Merge branch 'main' into lazy_in_background
Sicheng-Pan Jul 11, 2023
3c1e045
Minor refactor
Sicheng-Pan Jul 11, 2023
07d150d
rename
sorhawell Jul 12, 2023
397c4dd
add build and submit_polars + fix minors
sorhawell Jul 12, 2023
196e548
Merge remote-tracking branch 'origin/not_cran_dev_builds' into lazy_i…
Sicheng-Pan Jul 15, 2023
4557b1b
Implement apply-in-background
Sicheng-Pan Jul 15, 2023
daa90c8
Write unit tests
Sicheng-Pan Jul 15, 2023
78a03c8
Benchmark rbackground
Sicheng-Pan Jul 16, 2023
d990822
add expectations print + is_finished handle
sorhawell Jul 17, 2023
7a02517
add more scenarios
sorhawell Jul 17, 2023
f4b1f0f
Improve display of joined handle
Sicheng-Pan Jul 18, 2023
57def1f
RThreadHandle_is_finished.Rd
sorhawell Jul 18, 2023
3fee2c5
try remove NOT_CRAN
sorhawell Jul 18, 2023
d0758f0
try quote R -e arg
sorhawell Jul 18, 2023
3cbaceb
remove R native pipe operator
sorhawell Jul 18, 2023
5f05fce
try not use arg method
sorhawell Jul 18, 2023
0694664
write failed cmd string in error
sorhawell Jul 18, 2023
4ef435e
drop cmd_string
sorhawell Jul 18, 2023
c7653c2
try add line ending
sorhawell Jul 19, 2023
88d8b52
try not redirect std
sorhawell Jul 19, 2023
88048f1
dunno
sorhawell Jul 19, 2023
3d81fcf
Revert "RThreadHandle_is_finished.Rd"
sorhawell Jul 19, 2023
0781242
revert experiments
sorhawell Jul 19, 2023
65bb48f
Try std::env::current_exe
Sicheng-Pan Jul 20, 2023
ca17fa3
Merge remote-tracking branch 'origin/main' into lazy_in_background
Sicheng-Pan Jul 23, 2023
b03de39
Update extendr
Sicheng-Pan Jul 23, 2023
00c2b4d
Try environment variable
Sicheng-Pan Jul 23, 2023
97b2fb9
change low io high cpu example
sorhawell Jul 23, 2023
980abe8
update benchmark
sorhawell Jul 24, 2023
bd15917
improve Expr_map doc
sorhawell Jul 24, 2023
d21a881
collect_in_background docs
sorhawell Jul 24, 2023
75d0252
update oxygen
sorhawell Jul 24, 2023
c2b6237
super minor, Rctx::Handled
sorhawell Jul 24, 2023
0dee41a
Track R environment in Rust
Sicheng-Pan Jul 25, 2023
d4dbb5f
Remove old PolarsBackgroundHandler
Sicheng-Pan Jul 27, 2023
d9e2dab
chore rextendr 0.3.1.9000 nanoarrow .Rd
sorhawell Jul 27, 2023
388297a
avoid Rctx.into() -> RpolarsErr
sorhawell Jul 27, 2023
87adf0d
Merge apply_in_background to apply
Sicheng-Pan Jul 30, 2023
a2c30ae
Improve background error handling
Sicheng-Pan Jul 30, 2023
13bd3c7
make: redefine install, add install to docs
sorhawell Jul 30, 2023
d0f1765
merge main
sorhawell Aug 1, 2023
ecd6a7c
Merge branch 'main' into lazy_in_background
sorhawell Aug 1, 2023
bc4bb4c
more docs + fix RBackgroundPool
sorhawell Aug 1, 2023
08d8eab
impl thread queue in RBackGroundPool
sorhawell Aug 2, 2023
4fea62d
spawn processes in paralell
sorhawell Aug 2, 2023
2777c94
add test 3d to benchmark
sorhawell Aug 2, 2023
95122a5
increase default pool size to 4
sorhawell Aug 2, 2023
bc12766
update unit test
sorhawell Aug 2, 2023
1cdefe4
add parallel examples to map and apply
sorhawell Aug 7, 2023
1616095
merge main + update docs
sorhawell Aug 7, 2023
90d8be7
merge main + update news + roxygen
sorhawell Aug 8, 2023
afee8a9
polish news [skip ci]
sorhawell Aug 8, 2023
61cc469
add links
sorhawell Aug 8, 2023
8af927f
try rename all-features to full features
sorhawell Aug 8, 2023
46058db
fmt
sorhawell Aug 8, 2023
f352e0b
merge main + solve conflicts + docs + fix a utests
sorhawell Aug 9, 2023
b6916e4
fmt + roxygen
sorhawell Aug 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,11 @@ requirements-rs:
.PHONY: build
build: ## Compile polars R package with all features and generate Rd files
export RPOLARS_ALL_FEATURES=true \
&& export NOT_CRAN=true \
&& export RPOLARS_PROFILE=release-optimized \
&& Rscript -e 'if (!(require(arrow)&&require(nanoarrow))) warning("could not load arrow/nanoarrow, igonore changes to nanoarrow.Rd"); rextendr::document()'

.PHONY: install
install:
export RPOLARS_ALL_FEATURES=true \
&& export NOT_CRAN=true \
&& export RPOLARS_PROFILE=release-optimized \
&& R CMD INSTALL --no-multiarch --with-keep.source .

.PHONY: all
Expand Down
101 changes: 88 additions & 13 deletions R/rbackground.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,110 @@
}


#' @title RThreadHandle to string
#' @description get description of RThreadHandle as string
#' @param x RThreadHandle
#' @param ... not used
#' @export
#' @noRd
#' @keywords internal
as.character.RThreadHandle = function(x, ...) {
.pr$RThreadHandle$thread_description(x) |>
unwrap_or("An exhausted thread handle")
unwrap_or("An exhausted RThreadHandle")
}


#' s3 method print RThreadHandle
#'
#' @keywords internal
#' @param x RThreadHandle
#' @param ... not used
#'
#' @return self
#' @export
#' @noRd
#'
#' @examples
#' handle = pl$LazyFrame()$select(pl$lit(2) + 2)$collect_in_background()
#' print(handle)
#' handle$join()
#' print(handle)
print.RThreadHandle = function(x, ...) as.character(x) |> cat("\n")


#' @title Wait for the thread to complete its job
#' @title The RThreadHandle class
#' @name RThreadHandle_RThreadHandle_class
#' @description A handle to some polars query running in a background thread.
#' @details
#' `<LazyFrame>$collect_in_background()` will execute a polars query detached from the R session
#' and return a `RThreadHandle` immediately. The RThreadHandle has the methods `is_finished()` and
#' `$join()`.
#'
#' The background thread may access the pool of extra R sessions to process R code
#' embedded in polars query via `$map(...,background = TRUE)` or `$apply(background=TRUE)`. Use
#' `pl$set_global_rpool_cap()` to limit number of parallel R sessions. Extra R sessions are spawned
#' and used if `background` arg is set to TRUE.
#'
#' Starting polars `<LazyFrame>$collect_in_background()` with e.g. some
#' `$map(...,background = FALSE)` will raise an Error as the main R session is not available to
#' process the query.
#' @return see methods
#' @keywords RThreadHandle
RThreadHandle

#' Join a RThreadHandle
#' @keywords RThreadHandle
#' @param ... a RThreadHandle
#' @return a DataFrame, which is the result of the job
#' @details method `<RThreadHandle>$join()`: will block until job is done and then return some value
#' or raise an error from the thread.
#' Calling `<RThreadHandle>$join()` a second time will raise an error because handle is already
#' exhausted.
#' @export
RThreadHandle_join = function(...) {
RThreadHandle_join = function() {
.pr$RThreadHandle$join(self) |> unwrap()
}


#' @title Check if the thread completes its job
#' Ask if RThreadHandle is finished?
#' @keywords RThreadHandle
#' @param ... a RThreadHandle
#' @return a boolean indicating the whether the job has finished
#' or NULL if the handle has been joined
#' @details method `<RThreadHandle>$is_finished()`: Calling `<RThreadHandle>$is_finished()` returns
#' trinary value: `TRUE` if finished, `FALSE` if not, and `NULL` if the handle was exhausted
#' (already joined).
#' @export
RThreadHandle_is_finished = function(...) {
RThreadHandle_is_finished = function() {
.pr$RThreadHandle$is_finished(self) |> unwrap_or(NULL)
}




#' get/set global R session pool cap
#' @name global_rpool_cap
#' @param n integer, the capacity limit R sessions to process R code.
#' @return for `pl$get_global_rpool_cap()` a list(available = ? , capacity = ?)
#' where available is how many R session already spawned in pool. Capacity is the limit of
#' how many new R sessions to spawn. Anytime a polars thread worker needs a background R session
#' specifically to run R code embedded in a query via `$map(..., in_background = TRUE)` or
#' `$apply(..., in_background = TRUE)` it will obtain any R session idling in rpool, otherwise spawn
#' a new R session (process) and add it to pool if not `capacity` has been reached. If capacity has
#' been reached already the thread worker will sleep until an R session is idling.
#'
#' Background R sessions communicate via polars arrow IPC (series/vectors) or R serialize +
#' shared memory buffers via the rust crate `ipc-channel`. Multi-process communication has overhead
#' because all data must be serialized/de-serialized and sent via buffers. Using multiple R sessions
#' will likely only give a speed-up in a `low io - high cpu` scenario. Native polars query syntax
#' runs in threads and have no overhead.
#'
#' @examples
#' default = pl$get_global_rpool_cap()
#' print(default)
#' pl$set_global_rpool_cap(8)
#' pl$get_global_rpool_cap()
#' pl$set_global_rpool_cap(default$capacity)
pl$get_global_rpool_cap = function() {
get_global_rpool_cap() |> unwrap()
}

#' @rdname global_rpool_cap
#' @name set_global_rpool_cap
pl$set_global_rpool_cap = function(n) {
set_global_rpool_cap(n) |> unwrap() |> invisible()
}


4 changes: 1 addition & 3 deletions R/zzz.R
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ replace_private_with_pub_methods(Series, "^Series_")
# RThreadHandle
replace_private_with_pub_methods(RThreadHandle, "^RThreadHandle_")

# Global R process pool configuration
pl$get_global_rpool_cap = function() get_global_rpool_cap() |> unwrap()
pl$set_global_rpool_cap = function(c) set_global_rpool_cap(c) |> unwrap() |> invisible()



# expression constructors
Expand Down
3 changes: 2 additions & 1 deletion docs/make-docs.R
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ make_doc_hierarchy = function() {
out = list()
# order determines order in sidebar
classes = c("pl", "Series", "DataFrame", "LazyFrame", "GroupBy",
"LazyGroupBy", "arr", "ExprBin", "ExprDT", "ExprMeta", "ExprStr", "ExprStruct", "Expr")
"LazyGroupBy", "arr", "ExprBin", "ExprDT", "ExprMeta", "ExprStr", "ExprStruct",
"Expr", "RThreadHandle")
for (cl in classes) {
files = grep(paste0("^", cl, "_"), other, value = TRUE)
tmp = sprintf("%s: reference/%s", sub("\\.md", "", sub("[^_]*_", "", files)), files)
Expand Down
5 changes: 3 additions & 2 deletions inst/misc/benchmark_rbackground.R
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,16 @@ large_compute_bg$collect_in_background()$join() |> system.time()

### 3a ----------- Use R processes in parallel, low io, low cpu
lf <- pl$LazyFrame(lapply(1:100,\(i) rep(i,5)))
f_sum_all_cols <- \(lf,...) lf$select(pl$all()$map(\(x) {x$to_r() |> sum()},...))
f_sum_all_cols <- \(lf,...) lf$select(pl$all()$map(\(x) {print("hey");x$to_r() |> sum()},...))

f_sum_all_cols(lf)$collect() |> system.time()

pl$set_global_rpool_cap(1)
f_sum_all_cols(lf, in_background = TRUE)$collect() |> system.time() #burn-in start processes
f_sum_all_cols(lf, in_background = TRUE)$collect() |> system.time()

pl$set_global_rpool_cap(4)

pl$set_global_rpool_cap(8)
f_sum_all_cols(lf, in_background = TRUE)$collect() |> system.time() #burn-in start processes
f_sum_all_cols(lf, in_background = TRUE)$collect() |> system.time()

Expand Down
26 changes: 26 additions & 0 deletions man/RThreadHandle_RThreadHandle_class.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 8 additions & 10 deletions man/RThreadHandle_is_finished.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions man/RThreadHandle_join.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions man/as.character.RThreadHandle.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions man/global_rpool_cap.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions man/nanoarrow.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions man/print.RThreadHandle.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading