Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 20, 2024
1 parent e4c5447 commit a5aec04
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 5 deletions.
7 changes: 4 additions & 3 deletions crates/polars-arrow/src/mmap/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,23 @@ fn mmap_binview<T: AsRef<[u8]>>(

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

let views = get_buffer::<u128>(data_ref, block_offset, buffers, num_rows)?.as_ptr();
let views = get_buffer::<u128>(data_ref, block_offset, buffers, num_rows)?;

let n_variadic = variadic_buffer_counts
.pop_front()
.ok_or_else(|| polars_err!(ComputeError: "expected variadic_buffer_count"))?;

let mut buffer_ptrs = Vec::with_capacity(n_variadic + 2);
buffer_ptrs.push(validity);
buffer_ptrs.push(Some(views));
buffer_ptrs.push(Some(views.as_ptr()));

let mut variadic_buffer_sizes = Vec::with_capacity(n_variadic);
for _ in 0..n_variadic {
let variadic_buffer = get_bytes(data_ref, block_offset, buffers)?;
variadic_buffer_sizes.push(variadic_buffer.len());
variadic_buffer_sizes.push(variadic_buffer.len() as i64);
buffer_ptrs.push(Some(variadic_buffer.as_ptr()));
}
buffer_ptrs.push(Some(variadic_buffer_sizes.as_ptr().cast::<u8>()));

// Move variadic buffer sizes in an Arc, so that it stays alive.
let data = Arc::new((data, variadic_buffer_sizes));
Expand Down
13 changes: 12 additions & 1 deletion py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3255,6 +3255,8 @@ def write_ipc(
self,
file: None,
compression: IpcCompression = "uncompressed",
*,
future: bool = False,
) -> BytesIO:
...

Expand All @@ -3263,13 +3265,17 @@ def write_ipc(
self,
file: BinaryIO | BytesIO | str | Path,
compression: IpcCompression = "uncompressed",
*,
future: bool = False,
) -> None:
...

def write_ipc(
self,
file: BinaryIO | BytesIO | str | Path | None,
compression: IpcCompression = "uncompressed",
*,
future: bool = False,
) -> BytesIO | None:
"""
Write to Arrow IPC binary stream or Feather file.
Expand All @@ -3283,6 +3289,11 @@ def write_ipc(
written. If set to `None`, the output is returned as a BytesIO object.
compression : {'uncompressed', 'lz4', 'zstd'}
Compression method. Defaults to "uncompressed".
future
WARNING: this argument is unstable and will be removed without it being
considered a breaking change.
Setting this to `True` will write polars' internal data-structures that
might not be available by other Arrow implementations.
Examples
--------
Expand All @@ -3307,7 +3318,7 @@ def write_ipc(
if compression is None:
compression = "uncompressed"

self._df.write_ipc(file, compression)
self._df.write_ipc(file, compression, future)
return file if return_bytes else None # type: ignore[return-value]

@overload
Expand Down
3 changes: 3 additions & 0 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,12 +665,14 @@ impl PyDataFrame {
py: Python,
py_f: PyObject,
compression: Wrap<Option<IpcCompression>>,
future: bool,
) -> PyResult<()> {
if let Ok(s) = py_f.extract::<&str>(py) {
let f = std::fs::File::create(s)?;
py.allow_threads(|| {
IpcWriter::new(f)
.with_compression(compression.0)
.with_pl_flavor(future)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)
})?;
Expand All @@ -679,6 +681,7 @@ impl PyDataFrame {

IpcWriter::new(&mut buf)
.with_compression(compression.0)
.with_pl_flavor(future)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
}
Expand Down
10 changes: 9 additions & 1 deletion py-polars/tests/unit/io/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ def test_ipc_column_order(stream: bool) -> None:

@pytest.mark.write_disk()
def test_glob_ipc(df: pl.DataFrame, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)
file_path = tmp_path / "small.ipc"
df.write_ipc(file_path)

Expand All @@ -208,3 +207,12 @@ def test_from_float16() -> None:
pandas_df.to_feather(f)
f.seek(0)
assert pl.read_ipc(f, use_pyarrow=False).dtypes == [pl.Float32]


@pytest.mark.write_disk()
def test_binview_ipc_mmap(tmp_path: Path) -> None:
df = pl.DataFrame({"foo": ["aa" * 10, "bb", None, "small", "big" * 20]})
file_path = tmp_path / "dump.ipc"
df.write_ipc(file_path, future=True)
read = pl.read_ipc(file_path, memory_map=True)
assert_frame_equal(df, read)

0 comments on commit a5aec04

Please sign in to comment.