diff --git a/crates/polars-arrow/src/ffi/array.rs b/crates/polars-arrow/src/ffi/array.rs index 9d9ef9ef44b3..1e6581ee7550 100644 --- a/crates/polars-arrow/src/ffi/array.rs +++ b/crates/polars-arrow/src/ffi/array.rs @@ -280,7 +280,6 @@ unsafe fn create_buffer( } let offset = buffer_offset(array, data_type, index); - dbg!(offset, len); let ptr: *mut T = get_buffer_ptr(array, data_type, index)?; // We have to check alignment. diff --git a/crates/polars-arrow/src/mmap/array.rs b/crates/polars-arrow/src/mmap/array.rs index 6d5ec564cac9..b6e75364df20 100644 --- a/crates/polars-arrow/src/mmap/array.rs +++ b/crates/polars-arrow/src/mmap/array.rs @@ -139,7 +139,7 @@ fn mmap_binview>( let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); - let views = get_buffer::(data_ref, block_offset, buffers, num_rows)?.as_ptr(); + let views = get_buffer::(data_ref, block_offset, buffers, num_rows)?; let n_variadic = variadic_buffer_counts .pop_front() @@ -147,14 +147,15 @@ fn mmap_binview>( let mut buffer_ptrs = Vec::with_capacity(n_variadic + 2); buffer_ptrs.push(validity); - buffer_ptrs.push(Some(views)); + buffer_ptrs.push(Some(views.as_ptr())); let mut variadic_buffer_sizes = Vec::with_capacity(n_variadic); for _ in 0..n_variadic { let variadic_buffer = get_bytes(data_ref, block_offset, buffers)?; - variadic_buffer_sizes.push(variadic_buffer.len()); + variadic_buffer_sizes.push(variadic_buffer.len() as i64); buffer_ptrs.push(Some(variadic_buffer.as_ptr())); } + buffer_ptrs.push(Some(variadic_buffer_sizes.as_ptr().cast::())); // Move variadic buffer sizes in an Arc, so that it stays alive. let data = Arc::new((data, variadic_buffer_sizes)); diff --git a/crates/polars-pipe/src/executors/sinks/io.rs b/crates/polars-pipe/src/executors/sinks/io.rs index f32adc93fe17..096326d10c0a 100644 --- a/crates/polars-pipe/src/executors/sinks/io.rs +++ b/crates/polars-pipe/src/executors/sinks/io.rs @@ -152,7 +152,7 @@ impl IOThread { path.push(format!("{count}.ipc")); let file = File::create(path).unwrap(); - let writer = IpcWriter::new(file).with_pl_flavor(false); + let writer = IpcWriter::new(file).with_pl_flavor(true); let mut writer = writer.batched(&schema).unwrap(); writer.write_batch(&df).unwrap(); writer.finish().unwrap(); @@ -163,7 +163,7 @@ impl IOThread { path.push(format!("{count}.ipc")); let file = File::create(path).unwrap(); - let writer = IpcWriter::new(file).with_pl_flavor(false); + let writer = IpcWriter::new(file).with_pl_flavor(true); let mut writer = writer.batched(&schema).unwrap(); for df in iter { @@ -199,7 +199,7 @@ impl IOThread { path.push(format!("_{count}.ipc")); let file = File::create(path).unwrap(); - let mut writer = IpcWriter::new(file).with_pl_flavor(false); + let mut writer = IpcWriter::new(file).with_pl_flavor(true); writer.finish(&mut df).unwrap(); } else { let iter = Box::new(std::iter::once(df)); @@ -227,7 +227,7 @@ impl IOThread { // duplicates path.push(format!("_{count}.ipc")); let file = File::create(path).unwrap(); - let writer = IpcWriter::new(file).with_pl_flavor(false); + let writer = IpcWriter::new(file).with_pl_flavor(true); let mut writer = writer.batched(&self.schema).unwrap(); writer.write_batch(&df).unwrap(); writer.finish().unwrap(); diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index cfdc3f096301..220a0b5edf3a 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -3255,6 +3255,8 @@ def write_ipc( self, file: None, compression: IpcCompression = "uncompressed", + *, + future: bool = False, ) -> BytesIO: ... @@ -3263,6 +3265,8 @@ def write_ipc( self, file: BinaryIO | BytesIO | str | Path, compression: IpcCompression = "uncompressed", + *, + future: bool = False, ) -> None: ... @@ -3270,6 +3274,8 @@ def write_ipc( self, file: BinaryIO | BytesIO | str | Path | None, compression: IpcCompression = "uncompressed", + *, + future: bool = False, ) -> BytesIO | None: """ Write to Arrow IPC binary stream or Feather file. @@ -3283,6 +3289,11 @@ def write_ipc( written. If set to `None`, the output is returned as a BytesIO object. compression : {'uncompressed', 'lz4', 'zstd'} Compression method. Defaults to "uncompressed". + future + WARNING: this argument is unstable and will be removed without it being + considered a breaking change. + Setting this to `True` will write polars' internal data-structures that + might not be available by other Arrow implementations. Examples -------- @@ -3307,7 +3318,7 @@ def write_ipc( if compression is None: compression = "uncompressed" - self._df.write_ipc(file, compression) + self._df.write_ipc(file, compression, future) return file if return_bytes else None # type: ignore[return-value] @overload diff --git a/py-polars/src/dataframe.rs b/py-polars/src/dataframe.rs index 3b12af1638a1..39f0aef1f9d3 100644 --- a/py-polars/src/dataframe.rs +++ b/py-polars/src/dataframe.rs @@ -665,12 +665,14 @@ impl PyDataFrame { py: Python, py_f: PyObject, compression: Wrap>, + future: bool, ) -> PyResult<()> { if let Ok(s) = py_f.extract::<&str>(py) { let f = std::fs::File::create(s)?; py.allow_threads(|| { IpcWriter::new(f) .with_compression(compression.0) + .with_pl_flavor(future) .finish(&mut self.df) .map_err(PyPolarsErr::from) })?; @@ -679,6 +681,7 @@ impl PyDataFrame { IpcWriter::new(&mut buf) .with_compression(compression.0) + .with_pl_flavor(future) .finish(&mut self.df) .map_err(PyPolarsErr::from)?; } diff --git a/py-polars/tests/unit/io/test_ipc.py b/py-polars/tests/unit/io/test_ipc.py index 94c0a5dc2424..d3ef4f5cb16b 100644 --- a/py-polars/tests/unit/io/test_ipc.py +++ b/py-polars/tests/unit/io/test_ipc.py @@ -188,7 +188,6 @@ def test_ipc_column_order(stream: bool) -> None: @pytest.mark.write_disk() def test_glob_ipc(df: pl.DataFrame, tmp_path: Path) -> None: - tmp_path.mkdir(exist_ok=True) file_path = tmp_path / "small.ipc" df.write_ipc(file_path) @@ -208,3 +207,12 @@ def test_from_float16() -> None: pandas_df.to_feather(f) f.seek(0) assert pl.read_ipc(f, use_pyarrow=False).dtypes == [pl.Float32] + + +@pytest.mark.write_disk() +def test_binview_ipc_mmap(tmp_path: Path) -> None: + df = pl.DataFrame({"foo": ["aa" * 10, "bb", None, "small", "big" * 20]}) + file_path = tmp_path / "dump.ipc" + df.write_ipc(file_path, future=True) + read = pl.read_ipc(file_path, memory_map=True) + assert_frame_equal(df, read)