Skip to content

Commit

Permalink
feat: support mmap for binview in OOC (pola-rs#13872)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored and r-brink committed Jan 24, 2024
1 parent 84f6771 commit 8f121c9
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 10 deletions.
1 change: 0 additions & 1 deletion crates/polars-arrow/src/ffi/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ unsafe fn create_buffer<T: NativeType>(
}

let offset = buffer_offset(array, data_type, index);
dbg!(offset, len);
let ptr: *mut T = get_buffer_ptr(array, data_type, index)?;

// We have to check alignment.
Expand Down
7 changes: 4 additions & 3 deletions crates/polars-arrow/src/mmap/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,23 @@ fn mmap_binview<T: AsRef<[u8]>>(

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

let views = get_buffer::<u128>(data_ref, block_offset, buffers, num_rows)?.as_ptr();
let views = get_buffer::<u128>(data_ref, block_offset, buffers, num_rows)?;

let n_variadic = variadic_buffer_counts
.pop_front()
.ok_or_else(|| polars_err!(ComputeError: "expected variadic_buffer_count"))?;

let mut buffer_ptrs = Vec::with_capacity(n_variadic + 2);
buffer_ptrs.push(validity);
buffer_ptrs.push(Some(views));
buffer_ptrs.push(Some(views.as_ptr()));

let mut variadic_buffer_sizes = Vec::with_capacity(n_variadic);
for _ in 0..n_variadic {
let variadic_buffer = get_bytes(data_ref, block_offset, buffers)?;
variadic_buffer_sizes.push(variadic_buffer.len());
variadic_buffer_sizes.push(variadic_buffer.len() as i64);
buffer_ptrs.push(Some(variadic_buffer.as_ptr()));
}
buffer_ptrs.push(Some(variadic_buffer_sizes.as_ptr().cast::<u8>()));

// Move variadic buffer sizes in an Arc, so that it stays alive.
let data = Arc::new((data, variadic_buffer_sizes));
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-pipe/src/executors/sinks/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl IOThread {
path.push(format!("{count}.ipc"));

let file = File::create(path).unwrap();
let writer = IpcWriter::new(file).with_pl_flavor(false);
let writer = IpcWriter::new(file).with_pl_flavor(true);
let mut writer = writer.batched(&schema).unwrap();
writer.write_batch(&df).unwrap();
writer.finish().unwrap();
Expand All @@ -163,7 +163,7 @@ impl IOThread {
path.push(format!("{count}.ipc"));

let file = File::create(path).unwrap();
let writer = IpcWriter::new(file).with_pl_flavor(false);
let writer = IpcWriter::new(file).with_pl_flavor(true);
let mut writer = writer.batched(&schema).unwrap();

for df in iter {
Expand Down Expand Up @@ -199,7 +199,7 @@ impl IOThread {
path.push(format!("_{count}.ipc"));

let file = File::create(path).unwrap();
let mut writer = IpcWriter::new(file).with_pl_flavor(false);
let mut writer = IpcWriter::new(file).with_pl_flavor(true);
writer.finish(&mut df).unwrap();
} else {
let iter = Box::new(std::iter::once(df));
Expand Down Expand Up @@ -227,7 +227,7 @@ impl IOThread {
// duplicates
path.push(format!("_{count}.ipc"));
let file = File::create(path).unwrap();
let writer = IpcWriter::new(file).with_pl_flavor(false);
let writer = IpcWriter::new(file).with_pl_flavor(true);
let mut writer = writer.batched(&self.schema).unwrap();
writer.write_batch(&df).unwrap();
writer.finish().unwrap();
Expand Down
13 changes: 12 additions & 1 deletion py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3255,6 +3255,8 @@ def write_ipc(
self,
file: None,
compression: IpcCompression = "uncompressed",
*,
future: bool = False,
) -> BytesIO:
...

Expand All @@ -3263,13 +3265,17 @@ def write_ipc(
self,
file: BinaryIO | BytesIO | str | Path,
compression: IpcCompression = "uncompressed",
*,
future: bool = False,
) -> None:
...

def write_ipc(
self,
file: BinaryIO | BytesIO | str | Path | None,
compression: IpcCompression = "uncompressed",
*,
future: bool = False,
) -> BytesIO | None:
"""
Write to Arrow IPC binary stream or Feather file.
Expand All @@ -3283,6 +3289,11 @@ def write_ipc(
written. If set to `None`, the output is returned as a BytesIO object.
compression : {'uncompressed', 'lz4', 'zstd'}
Compression method. Defaults to "uncompressed".
future
WARNING: this argument is unstable and will be removed without it being
considered a breaking change.
Setting this to `True` will write polars' internal data-structures that
might not be available by other Arrow implementations.
Examples
--------
Expand All @@ -3307,7 +3318,7 @@ def write_ipc(
if compression is None:
compression = "uncompressed"

self._df.write_ipc(file, compression)
self._df.write_ipc(file, compression, future)
return file if return_bytes else None # type: ignore[return-value]

@overload
Expand Down
3 changes: 3 additions & 0 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,12 +665,14 @@ impl PyDataFrame {
py: Python,
py_f: PyObject,
compression: Wrap<Option<IpcCompression>>,
future: bool,
) -> PyResult<()> {
if let Ok(s) = py_f.extract::<&str>(py) {
let f = std::fs::File::create(s)?;
py.allow_threads(|| {
IpcWriter::new(f)
.with_compression(compression.0)
.with_pl_flavor(future)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)
})?;
Expand All @@ -679,6 +681,7 @@ impl PyDataFrame {

IpcWriter::new(&mut buf)
.with_compression(compression.0)
.with_pl_flavor(future)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
}
Expand Down
10 changes: 9 additions & 1 deletion py-polars/tests/unit/io/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ def test_ipc_column_order(stream: bool) -> None:

@pytest.mark.write_disk()
def test_glob_ipc(df: pl.DataFrame, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)
file_path = tmp_path / "small.ipc"
df.write_ipc(file_path)

Expand All @@ -208,3 +207,12 @@ def test_from_float16() -> None:
pandas_df.to_feather(f)
f.seek(0)
assert pl.read_ipc(f, use_pyarrow=False).dtypes == [pl.Float32]


@pytest.mark.write_disk()
def test_binview_ipc_mmap(tmp_path: Path) -> None:
df = pl.DataFrame({"foo": ["aa" * 10, "bb", None, "small", "big" * 20]})
file_path = tmp_path / "dump.ipc"
df.write_ipc(file_path, future=True)
read = pl.read_ipc(file_path, memory_map=True)
assert_frame_equal(df, read)

0 comments on commit 8f121c9

Please sign in to comment.