Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FlightRPC] Cannot use flight data with DataFusion (Rust) #43552

Open
EnricoMi opened this issue Aug 5, 2024 · 1 comment
Open

[FlightRPC] Cannot use flight data with DataFusion (Rust) #43552

EnricoMi opened this issue Aug 5, 2024 · 1 comment

Comments

@EnricoMi
Copy link

EnricoMi commented Aug 5, 2024

Describe the bug, including details regarding any error messages, version, and platform.

Fetching data via Apache Arrow Flight (C++, Python involved) and passing them to Apache DataFusion (Rust) does not work:

Memory pointer from external source (e.g, FFI) is not aligned with the specified scalar type.
Before importing buffer through FFI, please make sure the allocation is aligned.

This is likely due to #32276 / #36400.

Error:

thread '<unnamed>' panicked at /root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-buffer-52.0.0/src/buffer/scalar.rs:138:17:
Memory pointer from external source (e.g, FFI) is not aligned with the specified scalar type. Before importing buffer through FFI, please make sure the allocation is aligned.
stack backtrace:
   0:     0x7f4d25f576ea - <std::sys::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::h584e154fdf2d8641
   1:     0x7f4d24a0eb7b - core::fmt::write::h810564c4cb1595da
   2:     0x7f4d25f252a2 - std::io::Write::write_fmt::ha00a1de7318f2a48
   3:     0x7f4d25f5cb29 - std::sys::backtrace::print::h6238e978e425409a
   4:     0x7f4d25f5c316 - std::panicking::default_hook::{{closure}}::h0acffbc0a684bdb8
   5:     0x7f4d25f5d6f5 - std::panicking::rust_panic_with_hook::hcdf40f293c76fc9f
   6:     0x7f4d25f5cec2 - std::panicking::begin_panic_handler::{{closure}}::hfd12f36809a34009
   7:     0x7f4d25f5ce59 - std::sys::backtrace::__rust_end_short_backtrace::h6a9267615b3cf1db
   8:     0x7f4d25f5ce44 - rust_begin_unwind
   9:     0x7f4d24a0d4f2 - core::panicking::panic_fmt::hcabcb14b752ed0b3
  10:     0x7f4d246177b1 - arrow_buffer::buffer::scalar::ScalarBuffer<T>::new::h04fe130fe772026c
  11:     0x7f4d254255a0 - arrow_array::array::get_offsets::h216d5a4c8918fc01
  12:     0x7f4d2438af33 - arrow_array::array::make_array::h2cb6f33b6d6b2c59
  13:     0x7f4d24390613 - <arrow_array::array::struct_array::StructArray as core::convert::From<arrow_data::data::ArrayData>>::from::h07d61a4146018136
  14:     0x7f4d24246b46 - <arrow_array::record_batch::RecordBatch as arrow::pyarrow::FromPyArrow>::from_pyarrow_bound::h1af3cfef0e2e6e5a
  15:     0x7f4d23f169cb - <core::iter::adapters::GenericShunt<I,R> as core::iter::traits::iterator::Iterator>::next::h46b0b854c3de8071
  16:     0x7f4d240b7375 - <alloc::vec::Vec<T> as arrow::pyarrow::FromPyArrow>::from_pyarrow_bound::he22d7662854ef688
  17:     0x7f4d23f1864b - <core::iter::adapters::GenericShunt<I,R> as core::iter::traits::iterator::Iterator>::next::hd78ea1387d583d39
  18:     0x7f4d240293e0 - pyo3::impl_::extract_argument::extract_argument::hf1fa544aa176067c
  19:     0x7f4d2413265f - datafusion_python::context::PySessionContext::__pymethod_create_dataframe__::h0938b7fdddde4e81
  20:     0x7f4d24026e61 - pyo3::impl_::trampoline::trampoline::h86dfdb741875b71a
  21:     0x7f4d2412a441 - datafusion_python::context::<impl pyo3::impl_::pyclass::PyMethods<datafusion_python::context::PySessionContext> for pyo3::impl_::pyclass::PyClassImplCollector<datafusion_python::context::PySessionContext>>::py_methods::ITEMS::trampoline::hd70e045974ca5aa8
  22:     0x560f47b75569 - <unknown>
  23:     0x560f47b5cb2b - _PyEval_EvalFrameDefault
  24:     0x560f47b746ac - _PyFunction_Vectorcall
  25:     0x560f47b5c935 - _PyEval_EvalFrameDefault
  26:     0x560f47b59096 - <unknown>
  27:     0x560f47c4ef66 - PyEval_EvalCode
  28:     0x560f47c79e98 - <unknown>
  29:     0x560f47c7379b - <unknown>
  30:     0x560f47c79be5 - <unknown>
  31:     0x560f47c790c8 - _PyRun_SimpleFileObject
  32:     0x560f47c78d13 - _PyRun_AnyFileObject
  33:     0x560f47c6b70e - Py_RunMain
  34:     0x560f47c41dfd - Py_BytesMain
  35:     0x7f4d5a029d90 - __libc_start_call_main
                               at ./csu/../sysdeps/nptl/libc_start_call_main.h:58:16
  36:     0x7f4d5a029e40 - __libc_start_main_impl
                               at ./csu/../csu/libc-start.c:392:3
  37:     0x560f47c41cf5 - _start
  38:                0x0 - <unknown>
Traceback (most recent call last):
  File "/home/enrico/git/arrow-datafusion-issue/example.py", line 41, in <module>
    main(sys.argv[1])
  File "/home/enrico/git/arrow-datafusion-issue/example.py", line 33, in main
    df = ctx.create_dataframe([[batch for batch in partition] for partition in partitions])
pyo3_runtime.PanicException: Memory pointer from external source (e.g, FFI) is not aligned with the specified scalar type. Before importing buffer through FFI, please make sure the allocation is aligned.

Reproduce as follows:

git clone --depth=1 https://github.com/apache/arrow.git
git clone --depth=1 https://github.com/apache/arrow-testing.git

python -m venv venv
source venv/bin/activate
pip install pyarrow pandas datafusion

python arrow/python/examples/flight/server.py
RUST_BACKTRACE=1 python example.py arrow-testing/data/csv/aggregate_test_100.csv

with example.py:

import sys

import datafusion
import pyarrow
import pyarrow.flight
import pyarrow.csv as csv


def push_data(client, path):
    my_table = csv.read_csv(path).select(["c1"])
    df = my_table.to_pandas()
    writer, _ = client.do_put(pyarrow.flight.FlightDescriptor.for_path("file"), my_table.schema)
    writer.write_table(my_table)
    writer.close()


def get_data(client):
    descriptor = pyarrow.flight.FlightDescriptor.for_path("file")
    info = client.get_flight_info(descriptor)
    for endpoint in info.endpoints:
        for location in endpoint.locations:
            get_client = pyarrow.flight.FlightClient(location)
            reader = get_client.do_get(endpoint.ticket)
            yield reader.to_reader()


def main(path):
    client = pyarrow.flight.FlightClient(f"grpc+tcp://localhost:5005")
    push_data(client, path)
    partitions = get_data(client)

    ctx = datafusion.SessionContext()
    df = ctx.create_dataframe([[batch for batch in partition] for partition in partitions])
    print(df)


if __name__ == '__main__':
    if len(sys.argv) != 2:
        print("Provide the path to example CSV file")
        sys.exit(1)
    main(sys.argv[1])

The error is thrown in Apache Arrow Rust implementaton: https://github.com/apache/arrow-rs/blob/eddef43d1cb46c1287da187ea1d86b0e1dc35a13/arrow-buffer/src/buffer/scalar.rs#L138

let align = std::mem::align_of::<T>();
let is_aligned = buffer.as_ptr().align_offset(align) == 0;

match buffer.deallocation() {
    Deallocation::Standard(_) => assert!(
        is_aligned,
        "Memory pointer is not aligned with the specified scalar type"
    ),
    Deallocation::Custom(_, _) =>
        assert!(is_aligned, "Memory pointer from external source (e.g, FFI) is not aligned with the specified scalar type. Before importing buffer through FFI, please make sure the allocation is aligned."),
}

In my environment, depending on the CSV column, e.g. c1 (c2) with type i32 (i64), align is 4 (8) while buffer.as_ptr().align_offset(align) is always 3 (7), where arrow-rs requires this to be 0.

Component(s)

FlightRPC

@EnricoMi EnricoMi changed the title [FlightRPC] Cannot use flight data with datafusion (Rust) [FlightRPC] Cannot use flight data with DataFusion (Rust) Aug 5, 2024
@EnricoMi
Copy link
Author

EnricoMi commented Sep 27, 2024

This can be fixed in Rust Arrow by copying the data: apache/arrow-rs#6462 and apache/arrow-rs#6471.

Ideally, the data would not be misaligned in the first place so the memory can be reused without costly copying.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant