diff --git a/r/R/ipc.R b/r/R/ipc.R index 8c6a66741..721e1b238 100644 --- a/r/R/ipc.R +++ b/r/R/ipc.R @@ -243,3 +243,25 @@ guess_zip_filename <- function(x) { files } + +# The C-level R_tryCatch() does not provide for handling interrupts (or +# I couldn't figure out how to make it work), so instead we provide wrappers +# around readBin() and writeBin() that convert interrupt conditions to errors +# (which the C code does know how to handle). +read_bin_wrapper <- function(con, what, n) { + withCallingHandlers( + readBin(con, what, n), + interrupt = function(e) { + stop("user interrupt") + } + ) +} + +write_bin_wrapper <- function(object, con) { + withCallingHandlers( + writeBin(object, con), + interrupt = function(e) { + stop("user interrupt") + } + ) +} diff --git a/r/src/ipc.c b/r/src/ipc.c index e2703b128..4e7f84479 100644 --- a/r/src/ipc.c +++ b/r/src/ipc.c @@ -148,7 +148,7 @@ static SEXP call_readbin(void* hdata) { SEXP n = PROTECT(Rf_ScalarReal((double)data->buf_size_bytes)); SEXP call = PROTECT(Rf_lang4(nanoarrow_sym_readbin, data->con, nanoarrow_ptype_raw, n)); - SEXP result = PROTECT(Rf_eval(call, R_BaseEnv)); + SEXP result = PROTECT(Rf_eval(call, nanoarrow_ns_pkg)); R_xlen_t bytes_read = Rf_xlength(result); memcpy(data->buf, RAW(result), bytes_read); *(data->size_read_out) = bytes_read; @@ -160,13 +160,13 @@ static SEXP call_readbin(void* hdata) { static SEXP call_writebin(void* hdata) { struct ConnectionInputStreamHandler* data = (struct ConnectionInputStreamHandler*)hdata; - // Write 1MB chunks - int64_t chunk_buffer_size = 1048576; + // Write 16MB chunks + int64_t chunk_buffer_size = 16777216; SEXP chunk_buffer = PROTECT(Rf_allocVector(RAWSXP, chunk_buffer_size)); SEXP call = PROTECT(Rf_lang3(nanoarrow_sym_writebin, chunk_buffer, data->con)); while (data->buf_size_bytes > chunk_buffer_size) { memcpy(RAW(chunk_buffer), data->buf, chunk_buffer_size); - Rf_eval(call, R_BaseEnv); + Rf_eval(call, nanoarrow_ns_pkg); data->buf_size_bytes -= chunk_buffer_size; data->buf += chunk_buffer_size; } @@ -178,7 +178,7 @@ static SEXP call_writebin(void* hdata) { chunk_buffer = PROTECT(Rf_allocVector(RAWSXP, data->buf_size_bytes)); call = PROTECT(Rf_lang3(nanoarrow_sym_writebin, chunk_buffer, data->con)); memcpy(RAW(chunk_buffer), data->buf, data->buf_size_bytes); - Rf_eval(call, R_BaseEnv); + Rf_eval(call, nanoarrow_ns_pkg); UNPROTECT(2); } diff --git a/r/src/util.c b/r/src/util.c index 7bce0c1dd..56d9d05a7 100644 --- a/r/src/util.c +++ b/r/src/util.c @@ -43,8 +43,8 @@ void nanoarrow_init_cached_sexps(void) { nanoarrow_cls_schema = PROTECT(Rf_mkString("nanoarrow_schema")); nanoarrow_cls_array_stream = PROTECT(Rf_mkString("nanoarrow_array_stream")); nanoarrow_cls_buffer = PROTECT(Rf_mkString("nanoarrow_buffer")); - nanoarrow_sym_readbin = PROTECT(Rf_install("readBin")); - nanoarrow_sym_writebin = PROTECT(Rf_install("writeBin")); + nanoarrow_sym_readbin = PROTECT(Rf_install("read_bin_wrapper")); + nanoarrow_sym_writebin = PROTECT(Rf_install("write_bin_wrapper")); nanoarrow_ptype_raw = PROTECT(Rf_allocVector(RAWSXP, 0)); R_PreserveObject(nanoarrow_ns_pkg);