Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

map_elements completely breaks after an error #1253

Open
MichalLauer opened this issue Oct 10, 2024 · 7 comments
Open

map_elements completely breaks after an error #1253

MichalLauer opened this issue Oct 10, 2024 · 7 comments

Comments

@MichalLauer
Copy link

Hello,

I found a quite interesting bug that causes map_elements to stop completely working after it encounters an error at any given point before the execution. This examplary code works well and as expected:

library(polars)
#> Warning: package 'polars' was built under R version 4.3.3
pl$DataFrame(mtcars[, 1:4])$select(
  pl$all()$map_elements( \(x) x + 1 )
)$head()
#> shape: (5, 4)
#> ┌──────┬─────┬───────┬───────┐
#> │ mpg  ┆ cyl ┆ disp  ┆ hp    │
#> │ ---  ┆ --- ┆ ---   ┆ ---   │
#> │ f64  ┆ f64 ┆ f64   ┆ f64   │
#> ╞══════╪═════╪═══════╪═══════╡
#> │ 22.0 ┆ 7.0 ┆ 161.0 ┆ 111.0 │
#> │ 22.0 ┆ 7.0 ┆ 161.0 ┆ 111.0 │
#> │ 23.8 ┆ 5.0 ┆ 109.0 ┆ 94.0  │
#> │ 22.4 ┆ 7.0 ┆ 259.0 ┆ 111.0 │
#> │ 19.7 ┆ 9.0 ┆ 361.0 ┆ 176.0 │
#> └──────┴─────┴───────┴───────┘

However, if an error occurs inside map_elements in an evaluation that happened before, the identical code stops working.

library(polars)
#> Warning: package 'polars' was built under R version 4.3.3
pl$DataFrame(mtcars[, 1:4])$select(
  pl$all()$map_elements( \(x) stop() ) # Random error that might happen
)
#> Error: Execution halted with the following contexts
#>    0: In R: in $select()
#>    0: During function call [base::tryCatch(base::withCallingHandlers({
#>           NULL
#>           base::saveRDS(base::do.call(base::do.call, base::c(base::readRDS("C:\\Users\\lauerm\\AppData\\Local\\Temp\\RtmpILSP8N\\callr-fun-3e036195c72"), 
#>               base::list(envir = .GlobalEnv, quote = TRUE)), envir = .GlobalEnv, 
#>               quote = TRUE), file = "C:\\Users\\lauerm\\AppData\\Local\\Temp\\RtmpILSP8N\\callr-res-3e03be9166f", 
#>               compress = FALSE)
#>           base::flush(base::stdout())
#>           base::flush(base::stderr())
#>           NULL
#>           base::invisible()
#>       }, error = function(e) {
#>           {
#>               callr_data <- base::as.environment("tools:callr")$`__callr_data__`
#>               err <- callr_data$err
#>               if (FALSE) {
#>                   base::assign(".Traceback", base::.traceback(4), envir = callr_data)
#>                   utils::dump.frames("__callr_dump__")
#>                   base::assign(".Last.dump", .GlobalEnv$`__callr_dump__`, 
#>                       envir = callr_data)
#>                   base::rm("__callr_dump__", envir = .GlobalEnv)
#>               }
#>               e <- err$process_call(e)
#>               e2 <- err$new_error("error in callr subprocess")
#>               class <- base::class
#>               class(e2) <- base::c("callr_remote_error", class(e2))
#>               e2 <- err$add_trace_back(e2)
#>               cut <- base::which(e2$trace$scope == "global")[1]
#>               if (!base::is.na(cut)) {
#>                   e2$trace <- e2$trace[-(1:cut), ]
#>               }
#>               base::saveRDS(base::list("error", e2, e), file = base::paste0("C:\\Users\\lauerm\\AppData\\Local\\Temp\\RtmpILSP8N\\callr-res-3e03be9166f", 
#>                   ".error"))
#>           }
#>       }, interrupt = function(e) {
#>           {
#>               callr_data <- base::as.environment("tools:callr")$`__callr_data__`
#>               err <- callr_data$err
#>               if (FALSE) {
#>                   base::assign(".Traceback", base::.traceback(4), envir = callr_data)
#>                   utils::dump.frames("__callr_dump__")
#>                   base::assign(".Last.dump", .GlobalEnv$`__callr_dump__`, 
#>                       envir = callr_data)
#>                   base::rm("__callr_dump__", envir = .GlobalEnv)
#>               }
#>               e <- err$process_call(e)
#>               e2 <- err$new_error("error in callr subprocess")
#>               class <- base::class
#>               class(e2) <- base::c("callr_remote_error", class(e2))
#>               e2 <- err$add_trace_back(e2)
#>               cut <- base::which(e2$trace$scope == "global")[1]
#>               if (!base::is.na(cut)) {
#>                   e2$trace <- e2$trace[-(1:cut), ]
#>               }
#>               base::saveRDS(base::list("error", e2, e), file = base::paste0("C:\\Users\\lauerm\\AppData\\Local\\Temp\\RtmpILSP8N\\callr-res-3e03be9166f", 
#>                   ".error"))
#>           }
#>       }, callr_message = function(e) {
#>           base::try(base::signalCondition(e))
#>       }), error = function(e) {
#>           NULL
#>           if (FALSE) {
#>               base::try(base::stop(e))
#>           }
#>           else {
#>               base::invisible()
#>           }
#>       }, interrupt = function(e) {
#>           NULL
#>           if (FALSE) {
#>               e
#>           }
#>           else {
#>               base::invisible()
#>           }
#>       })]
#>    1: user function raised an error: EvalError(lang!(function (s) {    s$map_elements(f, return_type, strict_return_type, allow_fail_eval)}, ExternalPtr.set_class(["RPolarsSeries"]))

pl$DataFrame(mtcars[, 1:4])$select(
  pl$all()$map_elements( \(x) x + 1 )
)$head()
#> Error in .pr$DataFrame$select(self, unpack_list(..., .context = "in $select()")): user function panicked: select

The first error is expected, however, the same function that was previously run now stops working. The only resolution is to restart the R session. I am using the latest polars version.

packageVersion("polars")
#> [1] '0.19.1'

Any guess why that might be? Thanks!

@etiennebacher
Copy link
Collaborator

Hi, thanks for the report. I can reproduce but $map_elements() is tricky so I don't know how to fix it yet

@eitsupi
Copy link
Collaborator

eitsupi commented Oct 12, 2024

According to my observations when I implemented map_batches in the next branch, this may be due to the following:

if let Ok(packet) = any_new_msg {
let (s, c_tx) = packet;
let answer = i(s); //handle requst with i closure
let a = answer.map_err(|err| format!("user function raised an error: {:?} \n", err))?;
c_tx.send(a).unwrap();

The thread will not terminate successfully and will not be able to reconnect thereafter.

It may be possible to fix this by having it terminate when an error occurs, as in the next branch:

let Ok(a) = answer else {
// TODO: after this error, map_batches may freeze
ThreadCom::kill_global(conf);
return Err("User function raised an error".into());
};

@sorhawell
Copy link
Collaborator

sorhawell commented Nov 2, 2024

After a long time away polars with small kids and a new job. I was just going to give a small introduction on r-polars next week and I noticed this bug too. This is obviously very annoying :) I don't think it was always like this, but I cannot prove it.

I can try to take a look at it within the next month.

@sorhawell
Copy link
Collaborator

sorhawell commented Nov 2, 2024

I see my use of initcell is not quite as intended by rust crate author. If one ThreadCom (link between rust-polars threads and single r session) crashed then it replaced with kill_global + update_global. I wildly guess the polars threads in the polars thread pool still link to the crashed global threadcom, because init_cell does not support safely mutating the global state. It probably worked in the past, but since this is undefined behavior it could fail at any point.

rust init_cell

A cell which can be unsafely initialized or interiorly mutated, but safely accessed.

This is mostly intended for use in statics. The cell is safe to access, but must be initialized before any access. There is no synchronization to ensure initialization is observed, so you should initialize at the beginning of the main function or using something like the ctor crate.

The use of this global state, is to allow new oblivious threads spawned by rust-polars to look for and clone the current active functioning ThreadCom.

chatty_gippity says try once::sync::Lazy instead

use once_cell::sync::Lazy;
use std::sync::RwLock;

// Assuming ThreadCom<S, R> is defined elsewhere in your code
pub struct ThreadCom<S, R> {
    // Your fields here
}

// Define the global state
static GLOBAL_STATE: Lazy<RwLock<Option<ThreadCom<S, R>>>> = Lazy::new(|| RwLock::new(None));

maybe something comepletely different ¯\_(ツ)_/¯

@eitsupi
Copy link
Collaborator

eitsupi commented Nov 3, 2024

This is obviously very annoying :) I don't think it was always like this, but I cannot prove it.

It must have been this way for a long time, because it reproduces even in v0.9.0, the oldest version that can be easily installed today. (We need to use apply instead of map_elements)

@sorhawell
Copy link
Collaborator

sorhawell commented Nov 10, 2024

This bug was not caused init_cell vs once_cell, swapping to once_cell changed nothing. However maybe that change should be adopted in another PR for tidyness sake.

It turns out to be plain bug in how user errors were handled and polars states reset.

If a user map_ function raises an R error. The R interpreter will return directly and not gracefully shut down the polars query including closing ("killing") the "ThreadCom" object (lets multiple polars threads share the single R interpreter).

This ThreadCom then survives in the global register (once_cell/init_cell) due to no gracefull shutdown, but will be defunct in next polars query hence bug. If I force the global register to be reset at every polars query, the bug goes away (solution 1).

However I vaguely remember that is a problem if calling a polars query within user function of a polars query. In that case the inner polars query should not reset global threadCom as it will sever communication for possible other map_ functions

My candidate (solution 2) is to implicitly wrap any R user function in some tryCatch to ensure graceful shut down of polars. This might have a performance loss of 1-5ms or so per R user function call.

Solution 3a. when ever new polars query recycles a ThreadCom from the global register, it could just check once that it works by running a simple function. That might take 1ms once only. If it does not work, it will reset it.

Solution 3b, it would be even faster with some 'rust only' verification of ThreadCom, but then I might to rewrite some function signatures to allow a non R request via threadCom. Maybe not worth the hazzle.

I will look into 3b -> 3a -> 2 or so

@sorhawell
Copy link
Collaborator

A cell which can be unsafely initialized or interiorly mutated, but safe
yikes I made a big mistake. I found the documentation of the wrong crate with very similar use case and naming.

This is the right docs and our current use seems not be discouraged.
https://docs.rs/state/0.6.0/state/struct.InitCell.html

I should probably revert back to InitCell from once_cell::sync::Lazy. Either behaves very similar and are drop in replacements.
#1295

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants