Skip to content

Commit

Permalink
handle interrupts
Browse files Browse the repository at this point in the history
  • Loading branch information
paleolimbot committed Sep 15, 2024
1 parent 39b57f7 commit 94a2b43
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 7 deletions.
22 changes: 22 additions & 0 deletions r/R/ipc.R
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,25 @@ guess_zip_filename <- function(x) {

files
}

# The C-level R_tryCatch() does not provide for handling interrupts (or
# I couldn't figure out how to make it work), so instead we provide wrappers
# around readBin() and writeBin() that convert interrupt conditions to errors
# (which the C code does know how to handle).
read_bin_wrapper <- function(con, what, n) {
withCallingHandlers(
readBin(con, what, n),
interrupt = function(e) {
stop("user interrupt")
}
)
}

write_bin_wrapper <- function(object, con) {
withCallingHandlers(
writeBin(object, con),
interrupt = function(e) {
stop("user interrupt")
}
)
}
10 changes: 5 additions & 5 deletions r/src/ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ static SEXP call_readbin(void* hdata) {
SEXP n = PROTECT(Rf_ScalarReal((double)data->buf_size_bytes));
SEXP call = PROTECT(Rf_lang4(nanoarrow_sym_readbin, data->con, nanoarrow_ptype_raw, n));

SEXP result = PROTECT(Rf_eval(call, R_BaseEnv));
SEXP result = PROTECT(Rf_eval(call, nanoarrow_ns_pkg));
R_xlen_t bytes_read = Rf_xlength(result);
memcpy(data->buf, RAW(result), bytes_read);
*(data->size_read_out) = bytes_read;
Expand All @@ -160,13 +160,13 @@ static SEXP call_readbin(void* hdata) {
static SEXP call_writebin(void* hdata) {
struct ConnectionInputStreamHandler* data = (struct ConnectionInputStreamHandler*)hdata;

// Write 1MB chunks
int64_t chunk_buffer_size = 1048576;
// Write 16MB chunks
int64_t chunk_buffer_size = 16777216;
SEXP chunk_buffer = PROTECT(Rf_allocVector(RAWSXP, chunk_buffer_size));
SEXP call = PROTECT(Rf_lang3(nanoarrow_sym_writebin, chunk_buffer, data->con));
while (data->buf_size_bytes > chunk_buffer_size) {
memcpy(RAW(chunk_buffer), data->buf, chunk_buffer_size);
Rf_eval(call, R_BaseEnv);
Rf_eval(call, nanoarrow_ns_pkg);
data->buf_size_bytes -= chunk_buffer_size;
data->buf += chunk_buffer_size;
}
Expand All @@ -178,7 +178,7 @@ static SEXP call_writebin(void* hdata) {
chunk_buffer = PROTECT(Rf_allocVector(RAWSXP, data->buf_size_bytes));
call = PROTECT(Rf_lang3(nanoarrow_sym_writebin, chunk_buffer, data->con));
memcpy(RAW(chunk_buffer), data->buf, data->buf_size_bytes);
Rf_eval(call, R_BaseEnv);
Rf_eval(call, nanoarrow_ns_pkg);
UNPROTECT(2);
}

Expand Down
4 changes: 2 additions & 2 deletions r/src/util.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ void nanoarrow_init_cached_sexps(void) {
nanoarrow_cls_schema = PROTECT(Rf_mkString("nanoarrow_schema"));
nanoarrow_cls_array_stream = PROTECT(Rf_mkString("nanoarrow_array_stream"));
nanoarrow_cls_buffer = PROTECT(Rf_mkString("nanoarrow_buffer"));
nanoarrow_sym_readbin = PROTECT(Rf_install("readBin"));
nanoarrow_sym_writebin = PROTECT(Rf_install("writeBin"));
nanoarrow_sym_readbin = PROTECT(Rf_install("read_bin_wrapper"));
nanoarrow_sym_writebin = PROTECT(Rf_install("write_bin_wrapper"));
nanoarrow_ptype_raw = PROTECT(Rf_allocVector(RAWSXP, 0));

R_PreserveObject(nanoarrow_ns_pkg);
Expand Down

0 comments on commit 94a2b43

Please sign in to comment.