Skip to content

Commit

Permalink
add export to stream
Browse files Browse the repository at this point in the history
  • Loading branch information
sorhawell committed Jul 19, 2023
1 parent 278884e commit b6cc69d
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 3 deletions.
13 changes: 12 additions & 1 deletion src/rust/src/arrow_interop/to_rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub fn new_arrow_stream_internal() -> Robj {
// r-polars as consumer 2: recieve to pointer to own stream, which producer has exported to. Consume it. Return Series.
pub fn arrow_stream_to_s_internal(robj_str: Robj) -> RResult<pl::Series> {
// reclaim ownership of leaked box, and then drop/release it when consumed.
let us = crate::utils::robj_str_ptr_to_usize(robj_str)?;
let us = crate::utils::robj_str_ptr_to_usize(&robj_str)?;
let boxed_stream = unsafe { Box::from_raw(us as *mut ffi::ArrowArrayStream) };

//consume stream and produce a r-polars Series return as Robj
Expand Down Expand Up @@ -183,3 +183,14 @@ fn consume_arrow_stream_to_s(boxed_stream: Box<ffi::ArrowArrayStream>) -> RResul
}
Ok(s)
}

unsafe fn export_df_as_stream(df: pl::DataFrame, robj_str_ref: &Robj) -> RResult<()> {
let stream_ptr =
crate::utils::robj_str_ptr_to_usize(robj_str_ref)? as *mut ffi::ArrowArrayStream;
let schema = df.schema().to_arrow();
let data_type = pl::ArrowDataType::Struct(schema.fields);
let field = pl::ArrowField::new("", data_type, false);
let iter_boxed = Box::new(crate::rdataframe::OwnedDataFrameIterator::new(df));
unsafe { *stream_ptr = ffi::export_iterator(iter_boxed, field) };
Ok(())
}
2 changes: 1 addition & 1 deletion src/rust/src/rdataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct OwnedDataFrameIterator {
}

impl OwnedDataFrameIterator {
fn new(df: polars::frame::DataFrame) -> Self {
pub fn new(df: polars::frame::DataFrame) -> Self {
let schema = df.schema().to_arrow();
let data_type = DataType::Struct(schema.fields);
let vs = df.get_columns().to_vec();
Expand Down
2 changes: 1 addition & 1 deletion src/rust/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ pub fn collect_hinted_result_rerr<T>(
}

//keep error simple to interface with other libs
pub fn robj_str_ptr_to_usize(robj: Robj) -> RResult<usize> {
pub fn robj_str_ptr_to_usize(robj: &Robj) -> RResult<usize> {
|| -> RResult<usize> {
let str: &str = robj
.as_str()
Expand Down

0 comments on commit b6cc69d

Please sign in to comment.