Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flight data retrieved via Python client (wrapping C++) cannot be used by Rust Arrow #6471

Open
EnricoMi opened this issue Sep 27, 2024 · 1 comment · May be fixed by #6472
Open

Flight data retrieved via Python client (wrapping C++) cannot be used by Rust Arrow #6471

EnricoMi opened this issue Sep 27, 2024 · 1 comment · May be fixed by #6472
Labels

Comments

@EnricoMi
Copy link
Contributor

Describe the bug
Flight data retrieved via the Python client (wrapping C++) cannot be used in Rust Arrow:

Memory pointer from external source (e.g, FFI) is not aligned with the specified scalar type.
Before importing buffer through FFI, please make sure the allocation is aligned.

Full stack trace:

thread '<unnamed>' panicked at /root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-buffer-52.0.0/src/buffer/scalar.rs:138:17:
Memory pointer from external source (e.g, FFI) is not aligned with the specified scalar type. Before importing buffer through FFI, please make sure the allocation is aligned.
stack backtrace:
   0:     0x7f4d25f576ea - <std::sys::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::h584e154fdf2d8641
   1:     0x7f4d24a0eb7b - core::fmt::write::h810564c4cb1595da
   2:     0x7f4d25f252a2 - std::io::Write::write_fmt::ha00a1de7318f2a48
   3:     0x7f4d25f5cb29 - std::sys::backtrace::print::h6238e978e425409a
   4:     0x7f4d25f5c316 - std::panicking::default_hook::{{closure}}::h0acffbc0a684bdb8
   5:     0x7f4d25f5d6f5 - std::panicking::rust_panic_with_hook::hcdf40f293c76fc9f
   6:     0x7f4d25f5cec2 - std::panicking::begin_panic_handler::{{closure}}::hfd12f36809a34009
   7:     0x7f4d25f5ce59 - std::sys::backtrace::__rust_end_short_backtrace::h6a9267615b3cf1db
   8:     0x7f4d25f5ce44 - rust_begin_unwind
   9:     0x7f4d24a0d4f2 - core::panicking::panic_fmt::hcabcb14b752ed0b3
  10:     0x7f4d246177b1 - arrow_buffer::buffer::scalar::ScalarBuffer<T>::new::h04fe130fe772026c
  11:     0x7f4d254255a0 - arrow_array::array::get_offsets::h216d5a4c8918fc01
  12:     0x7f4d2438af33 - arrow_array::array::make_array::h2cb6f33b6d6b2c59
  13:     0x7f4d24390613 - <arrow_array::array::struct_array::StructArray as core::convert::From<arrow_data::data::ArrayData>>::from::h07d61a4146018136
  14:     0x7f4d24246b46 - <arrow_array::record_batch::RecordBatch as arrow::pyarrow::FromPyArrow>::from_pyarrow_bound::h1af3cfef0e2e6e5a
  15:     0x7f4d23f169cb - <core::iter::adapters::GenericShunt<I,R> as core::iter::traits::iterator::Iterator>::next::h46b0b854c3de8071
  16:     0x7f4d240b7375 - <alloc::vec::Vec<T> as arrow::pyarrow::FromPyArrow>::from_pyarrow_bound::he22d7662854ef688
  17:     0x7f4d23f1864b - <core::iter::adapters::GenericShunt<I,R> as core::iter::traits::iterator::Iterator>::next::hd78ea1387d583d39
  18:     0x7f4d240293e0 - pyo3::impl_::extract_argument::extract_argument::hf1fa544aa176067c
  19:     0x7f4d2413265f - datafusion_python::context::PySessionContext::__pymethod_create_dataframe__::h0938b7fdddde4e81
  20:     0x7f4d24026e61 - pyo3::impl_::trampoline::trampoline::h86dfdb741875b71a
  21:     0x7f4d2412a441 - datafusion_python::context::<impl pyo3::impl_::pyclass::PyMethods<datafusion_python::context::PySessionContext> for pyo3::impl_::pyclass::PyClassImplCollector<datafusion_python::context::PySessionContext>>::py_methods::ITEMS::trampoline::hd70e045974ca5aa8
  22:     0x560f47b75569 - <unknown>
  23:     0x560f47b5cb2b - _PyEval_EvalFrameDefault
  24:     0x560f47b746ac - _PyFunction_Vectorcall
  25:     0x560f47b5c935 - _PyEval_EvalFrameDefault
  26:     0x560f47b59096 - <unknown>
  27:     0x560f47c4ef66 - PyEval_EvalCode
  28:     0x560f47c79e98 - <unknown>
  29:     0x560f47c7379b - <unknown>
  30:     0x560f47c79be5 - <unknown>
  31:     0x560f47c790c8 - _PyRun_SimpleFileObject
  32:     0x560f47c78d13 - _PyRun_AnyFileObject
  33:     0x560f47c6b70e - Py_RunMain
  34:     0x560f47c41dfd - Py_BytesMain
  35:     0x7f4d5a029d90 - __libc_start_call_main
                               at ./csu/../sysdeps/nptl/libc_start_call_main.h:58:16
  36:     0x7f4d5a029e40 - __libc_start_main_impl
                               at ./csu/../csu/libc-start.c:392:3
  37:     0x560f47c41cf5 - _start
  38:                0x0 - <unknown>
Traceback (most recent call last):
  File "/home/enrico/git/arrow-datafusion-issue/example.py", line 41, in <module>
    main(sys.argv[1])
  File "/home/enrico/git/arrow-datafusion-issue/example.py", line 33, in main
    df = ctx.create_dataframe([[batch for batch in partition] for partition in partitions])
pyo3_runtime.PanicException: Memory pointer from external source (e.g, FFI) is not aligned with the specified scalar type. Before importing buffer through FFI, please make sure the allocation is aligned.

To Reproduce

git clone --depth=1 https://github.com/apache/arrow.git
git clone --depth=1 https://github.com/apache/arrow-testing.git

python -m venv venv
source venv/bin/activate
pip install pyarrow pandas datafusion

python arrow/python/examples/flight/server.py
RUST_BACKTRACE=1 python example.py arrow-testing/data/csv/aggregate_test_100.csv

with example.py:

import sys

import datafusion
import pyarrow
import pyarrow.flight
import pyarrow.csv as csv


def push_data(client, path):
    my_table = csv.read_csv(path).select(["c1"])
    df = my_table.to_pandas()
    writer, _ = client.do_put(pyarrow.flight.FlightDescriptor.for_path("file"), my_table.schema)
    writer.write_table(my_table)
    writer.close()


def get_data(client):
    descriptor = pyarrow.flight.FlightDescriptor.for_path("file")
    info = client.get_flight_info(descriptor)
    for endpoint in info.endpoints:
        for location in endpoint.locations:
            get_client = pyarrow.flight.FlightClient(location)
            reader = get_client.do_get(endpoint.ticket)
            yield reader.to_reader()


def main(path):
    client = pyarrow.flight.FlightClient(f"grpc+tcp://localhost:5005")
    push_data(client, path)
    partitions = get_data(client)

    ctx = datafusion.SessionContext()
    df = ctx.create_dataframe([[batch for batch in partition] for partition in partitions])
    print(df)


if __name__ == '__main__':
    if len(sys.argv) != 2:
        print("Provide the path to example CSV file")
        sys.exit(1)
    main(sys.argv[1])

The error is thrown in:

impl<T: ArrowNativeType> From<Buffer> for ScalarBuffer<T> {
fn from(buffer: Buffer) -> Self {
let align = std::mem::align_of::<T>();
let is_aligned = buffer.as_ptr().align_offset(align) == 0;
match buffer.deallocation() {
Deallocation::Standard(_) => assert!(
is_aligned,
"Memory pointer is not aligned with the specified scalar type"
),
Deallocation::Custom(_, _) =>
assert!(is_aligned, "Memory pointer from external source (e.g, FFI) is not aligned with the specified scalar type. Before importing buffer through FFI, please make sure the allocation is aligned."),
}
Self {
buffer,
phantom: Default::default(),
}
}
}

Expected behavior
Ideally, the data are not misaligned in the first place. Given the data can come from any implementation and Rust is sensitive to alignment, the Rust implementation should guard itself against misaligned data. Misaligned buffers should be aligned.

Ideally, the behaviour is configurable:

  • fail on misalignment
  • fix misalignment silently
  • fix misalignment and warn about it
@tustvold
Copy link
Contributor

This is actually a longstanding bug in arrow-cpp/pyarrow - apache/arrow#32276

There are linked solutions on that ticket

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants