Skip to content

Commit

Permalink
feat(python): Implement array from buffer for non-CPU arrays (#550)
Browse files Browse the repository at this point in the history
Requires building with (e.g.) `export
NANOARROW_PYTHON_CUDA=/usr/local/cuda` and a `cupy` install:

```python
import nanoarrow as na
from nanoarrow import device
import cupy as cp


device.c_device_array(cp.array([1, 2, 3]))
#> <nanoarrow.device.CDeviceArray>
#> - device_type: CUDA <2>
#> - device_id: 0
#> - array: <nanoarrow.c_array.CArray int64>
#>   - length: 3
#>   - offset: 0
#>   - null_count: 0
#>   - buffers: (0, 133980798058496)
#>   - dictionary: NULL
#>   - children[0]:

# Also roundtrips
darray = device.c_device_array(cp.array([1, 2, 3]))
cp.from_dlpack(darray.array.view().buffer(1))
#> array([1, 2, 3])
```

---------

Co-authored-by: Dane Pitkin <[email protected]>
  • Loading branch information
paleolimbot and danepitkin authored Sep 18, 2024
1 parent 48168e0 commit 6118e24
Show file tree
Hide file tree
Showing 17 changed files with 588 additions and 72 deletions.
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def get_version(pkg_path):

device_include_dirs.append(str(include_dir))
device_libraries.append("cuda")
device_define_macros.append(("NANOARROW_DEVICE_WITH_CUDA", 1))
extra_define_macros.append(("NANOARROW_DEVICE_WITH_CUDA", 1))

# Library might be already in a system library directory such that no -L flag
# is needed
Expand Down
7 changes: 4 additions & 3 deletions python/src/nanoarrow/_array.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ from nanoarrow_device_c cimport (
ArrowDeviceType
)

from nanoarrow._device cimport Device
from nanoarrow._device cimport CSharedSyncEvent
from nanoarrow._schema cimport CSchema


Expand All @@ -39,15 +39,16 @@ cdef class CArray:
cdef CSchema _schema
cdef ArrowDeviceType _device_type
cdef int _device_id
cdef void* _sync_event

cdef _set_device(self, ArrowDeviceType device_type, int64_t device_id)
cdef _set_device(self, ArrowDeviceType device_type, int64_t device_id, void* sync_event)


cdef class CArrayView:
cdef object _base
cdef object _array_base
cdef ArrowArrayView* _ptr
cdef Device _device
cdef CSharedSyncEvent _event

cdef class CDeviceArray:
cdef object _base
Expand Down
110 changes: 85 additions & 25 deletions python/src/nanoarrow/_array.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,14 @@ from nanoarrow_c cimport (
NANOARROW_OK,
)


from nanoarrow_device_c cimport (
ARROW_DEVICE_CPU,
ArrowDeviceType,
ArrowDeviceArray,
ArrowDeviceArrayInit,
)

from nanoarrow._device cimport Device
from nanoarrow._device cimport Device, CSharedSyncEvent

from nanoarrow._buffer cimport CBuffer, CBufferView
from nanoarrow._schema cimport CSchema, CLayout
Expand Down Expand Up @@ -107,7 +106,7 @@ cdef class CArrayView:
def __cinit__(self, object base, uintptr_t addr):
self._base = base
self._ptr = <ArrowArrayView*>addr
self._device = DEVICE_CPU
self._event = CSharedSyncEvent(DEVICE_CPU)

def _set_array(self, CArray array, Device device=DEVICE_CPU):
cdef Error error = Error()
Expand All @@ -120,7 +119,8 @@ cdef class CArrayView:

error.raise_message_not_ok("ArrowArrayViewSetArray()", code)
self._array_base = array._base
self._device = device
self._event = CSharedSyncEvent(device, <uintptr_t>array._sync_event)

return self

@property
Expand Down Expand Up @@ -160,7 +160,7 @@ cdef class CArrayView:
self._ptr.null_count = 0
elif validity_bits == NULL:
self._ptr.null_count = 0
elif self._device is DEVICE_CPU:
elif self._event.device is DEVICE_CPU:
self._ptr.null_count = ArrowArrayViewComputeNullCount(self._ptr)

return self._ptr.null_count
Expand All @@ -178,7 +178,8 @@ cdef class CArrayView:
<uintptr_t>self._ptr.children[i]
)

child._device = self._device
child._event = self._event

return child

@property
Expand Down Expand Up @@ -227,7 +228,7 @@ cdef class CArrayView:
buffer_view.size_bytes,
self._ptr.layout.buffer_data_type[i],
self._ptr.layout.element_size_bits[i],
self._device
self._event
)

@property
Expand All @@ -239,11 +240,14 @@ cdef class CArrayView:
def dictionary(self):
if self._ptr.dictionary == NULL:
return None
else:
return CArrayView(
self,
<uintptr_t>self._ptr.dictionary
)

cdef CArrayView dictionary = CArrayView(
self,
<uintptr_t>self._ptr.dictionary
)
dictionary._event = self._event

return dictionary

def __repr__(self):
return _repr_utils.array_view_repr(self)
Expand Down Expand Up @@ -288,11 +292,13 @@ cdef class CArray:
self._ptr = <ArrowArray*>addr
self._schema = schema
self._device_type = ARROW_DEVICE_CPU
self._device_id = 0
self._device_id = -1
self._sync_event = NULL

cdef _set_device(self, ArrowDeviceType device_type, int64_t device_id):
cdef _set_device(self, ArrowDeviceType device_type, int64_t device_id, void* sync_event):
self._device_type = device_type
self._device_id = device_id
self._sync_event = sync_event

@staticmethod
def _import_from_c_capsule(schema_capsule, array_capsule) -> CArray:
Expand Down Expand Up @@ -350,7 +356,8 @@ cdef class CArray:
c_array_out.offset = c_array_out.offset + start
c_array_out.length = stop - start
cdef CArray out = CArray(base, <uintptr_t>c_array_out, self._schema)
out._set_device(self._device_type, self._device_id)
out._set_device(self._device_type, self._device_id, self._sync_event)

return out

def __arrow_c_array__(self, requested_schema=None):
Expand Down Expand Up @@ -466,7 +473,7 @@ cdef class CArray:
<uintptr_t>self._ptr.children[i],
self._schema.child(i)
)
out._set_device(self._device_type, self._device_id)
out._set_device(self._device_type, self._device_id, self._sync_event)
return out

@property
Expand All @@ -480,7 +487,7 @@ cdef class CArray:
cdef CArray out
if self._ptr.dictionary != NULL:
out = CArray(self, <uintptr_t>self._ptr.dictionary, self._schema.dictionary)
out._set_device(self._device_type, self._device_id)
out._set_device(self._device_type, self._device_id, self._sync_event)
return out
else:
return None
Expand All @@ -497,22 +504,24 @@ cdef class CArrayBuilder:
"""
cdef CArray c_array
cdef ArrowArray* _ptr
cdef Device _device
cdef bint _can_validate

def __cinit__(self, CArray array):
def __cinit__(self, CArray array, Device device=DEVICE_CPU):
self.c_array = array
self._ptr = array._ptr
self._can_validate = True
self._device = device
self._can_validate = device is DEVICE_CPU

@staticmethod
def allocate():
def allocate(Device device=DEVICE_CPU):
"""Create a CArrayBuilder
Allocates memory for an ArrowArray and populates it with nanoarrow's
ArrowArray private_data/release callback implementation. This should
usually be followed by :meth:`init_from_type` or :meth:`init_from_schema`.
"""
return CArrayBuilder(CArray.allocate(CSchema.allocate()))
return CArrayBuilder(CArray.allocate(CSchema.allocate()), device)

def is_empty(self) -> bool:
"""Check if any items have been appended to this builder"""
Expand Down Expand Up @@ -550,6 +559,9 @@ cdef class CArrayBuilder:
Calling this method is required to produce a valid array prior to calling
:meth:`append_strings` or `append_bytes`.
"""
if self._device != DEVICE_CPU:
raise ValueError("Can't append to non-CPU array")

cdef int code = ArrowArrayStartAppending(self._ptr)
Error.raise_error_not_ok("ArrowArrayStartAppending()", code)
return self
Expand Down Expand Up @@ -617,7 +629,11 @@ cdef class CArrayBuilder:
return self

def resolve_null_count(self) -> CArrayBuilder:
"""Ensure the output null count is synchronized with existing buffers"""
"""Ensure the output null count is synchronized with existing buffers

Note that this will not attempt to access non-CPU buffers such that
:attr:`null_count` might still be -1 after calling this method.
"""
self.c_array._assert_valid()

# This doesn't apply to unions. We currently don't have a schema view
Expand All @@ -636,6 +652,10 @@ cdef class CArrayBuilder:
self._ptr.null_count = 0
return self

# Don't attempt to access a non-cpu buffer
if self._device != DEVICE_CPU:
return self

# From _ArrowBytesForBits(), which is not included in nanoarrow_c.pxd
# because it's an internal inline function.
cdef int64_t bits = self._ptr.offset + self._ptr.length
Expand Down Expand Up @@ -669,6 +689,14 @@ cdef class CArrayBuilder:
if i < 0 or i > 3:
raise IndexError("i must be >= 0 and <= 3")

if buffer._device != self._device:
raise ValueError(
f"Builder device ({self._device.device_type}/{self._device.device_id})"
" and buffer device "
f"({buffer._device.device_type}/{buffer._device.device_id})"
" are not identical"
)

self.c_array._assert_valid()
if not move:
buffer = CBuffer.from_pybuffer(buffer)
Expand All @@ -694,6 +722,26 @@ cdef class CArrayBuilder:
if child._ptr.release != NULL:
ArrowArrayRelease(child._ptr)

if (
self._device.device_type_id != c_array.device_type_id
or self._device.device_id != c_array.device_id
):
raise ValueError(
f"Builder device ({self._device.device_type}/{self._device.device_id})"
" and child device "
f"({c_array.device_type}/{c_array.device_id}) are not identical"
)

# There is probably a way to avoid a full synchronize for each child
# (e.g., perhaps the ArrayBuilder could allocate a stream to use such
# that an event can be allocated on finish_device() and synchronization
# could be avoided entirely). Including this for now for safety.
cdef CSharedSyncEvent sync = CSharedSyncEvent(
self._device,
<uintptr_t>c_array._sync_event
)
sync.synchronize()

if not move:
c_array_shallow_copy(c_array._base, c_array._ptr, child._ptr)
else:
Expand Down Expand Up @@ -747,6 +795,20 @@ cdef class CArrayBuilder:

return out

def finish_device(self):
"""Finish building this array and export to an ArrowDeviceArray
Calls :meth:`finish`, propagating device information into an ArrowDeviceArray.
"""
cdef CArray array = self.finish()

cdef ArrowDeviceArray* device_array_ptr
holder = alloc_c_device_array(&device_array_ptr)
cdef int code = ArrowDeviceArrayInit(self._device._ptr, device_array_ptr, array._ptr, NULL)
Error.raise_error_not_ok("ArrowDeviceArrayInit", code)

return CDeviceArray(holder, <uintptr_t>device_array_ptr, array._schema)


cdef class CDeviceArray:
"""Low-level ArrowDeviceArray wrapper
Expand Down Expand Up @@ -792,10 +854,8 @@ cdef class CDeviceArray:

@property
def array(self) -> CArray:
# TODO: We lose access to the sync_event here, so we probably need to
# synchronize (or propagate it, or somehow prevent data access downstream)
cdef CArray array = CArray(self, <uintptr_t>&self._ptr.array, self._schema)
array._set_device(self._ptr.device_type, self._ptr.device_id)
array._set_device(self._ptr.device_type, self._ptr.device_id, self._ptr.sync_event)
return array

def view(self) -> CArrayView:
Expand Down
4 changes: 2 additions & 2 deletions python/src/nanoarrow/_buffer.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ from nanoarrow_c cimport (
ArrowType,
)

from nanoarrow._device cimport Device
from nanoarrow._device cimport Device, CSharedSyncEvent


cdef class CBufferView:
cdef object _base
cdef ArrowBufferView _ptr
cdef ArrowType _data_type
cdef Device _device
cdef CSharedSyncEvent _event
cdef Py_ssize_t _element_size_bits
cdef Py_ssize_t _shape
cdef Py_ssize_t _strides
Expand Down
Loading

0 comments on commit 6118e24

Please sign in to comment.