Skip to content

Commit

Permalink
feat(python): Add user-facing Array class (#396)
Browse files Browse the repository at this point in the history
This PR implements the `nanoarrow.Array` which basically a
`pyarrow.ChunkedArray`. This can represent a `Table`, `RecordBatch`,
`ChunkedArray`, and `Array`. It doesn't quite play nicely with pyarrow's
ChunkedArray (but will after the next release, since
`__arrow_c_stream__` was just added there).

The user-facing class is backed by a Cython class, the
`CMaterializedArrayStream`, which manages some of the c-level details
like resolving a chunk + offset when there is more than one chunk in the
array. An early version of this PR implemented the
`CMaterializedArrayStream` using C pointers (e.g., `ArrowArray*
arrays`), but I decided that was to complex and went back to
`List[CArray]`. I think this is also better for managing ownership
(e.g., more unneeded `CArray` instances can be released by the garbage
collector).

The `Array` class as implemented here is device-aware, although until we
have non-CPU support it's difficult to test this. The methods I added
here are basically stubs just to demonstrate the intention.

This PR also implements the `Scalar`, whose main purpose for testing and
other non-performance sensitive things (like lazier reprs for very large
items or interactive inspection). They may also be useful for working
with arrays that contain elements with very long strings or large arrays
(e.g., geometry).

I also added some basic accessors like `buffer()`, `child()`, and some
ways one might want to iterate over an `Array` to make the utility of
this class more clear.

Basic usage:

```python
import nanoarrow as na

na.Array(range(100), na.int64())
```

```
nanoarrow.Array<int64>[100]
0
1
2
3
4
5
6
7
8
9
...and 90 more items
```

More involved example reading from an IPC stream:

```python
import nanoarrow as na
from nanoarrow.ipc import Stream

url = "https://github.com/apache/arrow-testing/raw/master/data/arrow-ipc-stream/integration/1.0.0-littleendian/generated_primitive.stream"

with Stream.from_url(url) as inp:
    array = na.Array(inp)

array.child(25)
```

```
nanoarrow.Array<string>[37]
'co矢2p矢m'
'w€acrd'
'kjd1dlô'
'pib矢d5w'
'6nnpwôg'
'ndj£h£4'
'ôôf4aµg'
'kwÂh£fr'
'°g5dk€e'
'r€cbmdn'
...and 27 more items
```

---------

Co-authored-by: Joris Van den Bossche <[email protected]>
  • Loading branch information
paleolimbot and jorisvandenbossche authored Mar 21, 2024
1 parent dc50114 commit 7af6dff
Show file tree
Hide file tree
Showing 13 changed files with 1,077 additions and 108 deletions.
2 changes: 2 additions & 0 deletions python/src/nanoarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
decimal256,
struct,
)
from nanoarrow.array import Array
from nanoarrow._version import __version__ # noqa: F401

# Helps Sphinx automatically populate an API reference section
Expand Down Expand Up @@ -123,4 +124,5 @@
"uint32",
"uint64",
"uint8",
"Array",
]
98 changes: 55 additions & 43 deletions python/src/nanoarrow/_ipc_lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,32 @@ cdef extern from "nanoarrow_ipc.h" nogil:


cdef class PyInputStreamPrivate:
cdef object obj
cdef object obj_method
cdef void* addr
cdef Py_ssize_t size_bytes
cdef int close_stream

def __cinit__(self, obj, close_stream=False):
self.obj = obj
self.obj_method = obj.readinto
self.addr = NULL
self.size_bytes = 0
self.close_stream = close_stream
cdef object _obj
cdef bint _close_obj
cdef void* _addr
cdef Py_ssize_t _size_bytes

def __cinit__(self, obj, close_obj=False):
self._obj = obj
self._close_obj = close_obj
self._addr = NULL
self._size_bytes = 0

@property
def obj(self):
return self._obj

@property
def close_obj(self):
return self._close_obj

def set_buffer(self, uintptr_t addr, Py_ssize_t size_bytes):
self._addr = <void*>addr
self._size_bytes = size_bytes

# Needed for at least some implementations of readinto()
def __len__(self):
return self.size_bytes
return self._size_bytes

# Implement the buffer protocol so that this object can be used as
# the argument to xxx.readinto(). This ensures that no extra copies
Expand All @@ -75,41 +85,43 @@ cdef class PyInputStreamPrivate:
# implementation before issuing each read call (two per message, with
# an extra call for a RecordBatch message to get the actual buffer data).
def __getbuffer__(self, Py_buffer* buffer, int flags):
PyBuffer_FillInfo(buffer, self, self.addr, self.size_bytes, 0, flags)
PyBuffer_FillInfo(buffer, self, self._addr, self._size_bytes, 0, flags)

def __releasebuffer__(self, Py_buffer* buffer):
pass


cdef ArrowErrorCode py_input_stream_read(ArrowIpcInputStream* stream, uint8_t* buf,
int64_t buf_size_bytes, int64_t* size_read_out,
ArrowError* error) noexcept:
cdef PyInputStreamPrivate stream_private = <object>stream.private_data
stream_private.addr = buf
stream_private.size_bytes = buf_size_bytes

try:
size_read_out[0] = stream_private.obj_method(stream_private)
return NANOARROW_OK
except Exception as e:
cls = type(e).__name__.encode()
msg = str(e).encode()
snprintf(
error.message,
sizeof(error.message),
"%s: %s",
<const char*>cls,
<const char*>msg
)
return EIO


cdef void py_input_stream_release(ArrowIpcInputStream* stream) noexcept:
cdef PyInputStreamPrivate stream_private = <object>stream.private_data
if stream_private.close_stream:
stream_private.obj.close()

Py_DECREF(stream_private)
ArrowError* error) noexcept nogil:

with gil:
stream_private = <object>stream.private_data
stream_private.set_buffer(<uintptr_t>buf, buf_size_bytes)

try:
size_read_out[0] = stream_private.obj.readinto(stream_private)
return NANOARROW_OK
except Exception as e:
cls = type(e).__name__.encode()
msg = str(e).encode()
snprintf(
error.message,
sizeof(error.message),
"%s: %s",
<const char*>cls,
<const char*>msg
)
return EIO

cdef void py_input_stream_release(ArrowIpcInputStream* stream) noexcept nogil:
with gil:
stream_private = <object>stream.private_data
if stream_private.close_obj:
stream_private.obj.close()

Py_DECREF(stream_private)

stream.private_data = NULL
stream.release = NULL

Expand All @@ -136,9 +148,9 @@ cdef class CIpcInputStream:
return False

@staticmethod
def from_readable(obj, close_stream=False):
def from_readable(obj, close_obj=False):
cdef CIpcInputStream stream = CIpcInputStream()
cdef PyInputStreamPrivate private_data = PyInputStreamPrivate(obj, close_stream)
cdef PyInputStreamPrivate private_data = PyInputStreamPrivate(obj, close_obj)

stream._stream.private_data = <PyObject*>private_data
Py_INCREF(private_data)
Expand Down
126 changes: 124 additions & 2 deletions python/src/nanoarrow/_lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ generally have better autocomplete + documentation available to IDEs).
from libc.stdint cimport uintptr_t, uint8_t, int64_t
from libc.string cimport memcpy
from libc.stdio cimport snprintf
from libc.errno cimport ENOMEM
from cpython.bytes cimport PyBytes_FromStringAndSize
from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer, PyCapsule_IsValid
from cpython cimport (
Expand Down Expand Up @@ -1066,10 +1067,11 @@ cdef class CArray:
return out

def __getitem__(self, k):
self._assert_valid()

if not isinstance(k, slice):
raise TypeError(
f"Can't slice CArray with object of type {type(k).__name__}"
)
f"Can't subset CArray with object of type {type(k).__name__}")

if k.step is not None:
raise ValueError("Can't slice CArray with step")
Expand Down Expand Up @@ -2198,6 +2200,126 @@ cdef class CArrayStream:
return _repr_utils.array_stream_repr(self)


cdef class CMaterializedArrayStream:
cdef CSchema _schema
cdef CBuffer _array_ends
cdef list _arrays
cdef int64_t _total_length

def __cinit__(self):
self._arrays = []
self._total_length = 0
self._schema = CSchema.allocate()
self._array_ends = CBuffer.empty()
cdef int code = ArrowBufferAppendInt64(self._array_ends._ptr, 0)
Error.raise_error_not_ok("ArrowBufferAppendInt64()", code)

cdef _finalize(self):
self._array_ends._set_data_type(NANOARROW_TYPE_INT64)

@property
def schema(self):
return self._schema

def __getitem__(self, k):
cdef int64_t kint
cdef int array_i
cdef const int64_t* sorted_offsets = <int64_t*>self._array_ends._ptr.data

if isinstance(k, slice):
raise NotImplementedError("index with slice")

kint = k
if kint < 0:
kint += self._total_length
if kint < 0 or kint >= self._total_length:
raise IndexError(f"Index {kint} is out of range")

array_i = ArrowResolveChunk64(kint, sorted_offsets, 0, len(self._arrays))
kint -= sorted_offsets[array_i]
return self._arrays[array_i], kint

def __len__(self):
return self._array_ends[len(self._arrays)]

def __iter__(self):
for c_array in self._arrays:
for item_i in range(c_array.length):
yield c_array, item_i

def array(self, int64_t i):
return self._arrays[i]

@property
def n_arrays(self):
return len(self._arrays)

@property
def arrays(self):
return iter(self._arrays)

def __arrow_c_stream__(self, requested_schema=None):
# When an array stream from iterable is supported, that could be used here
# to avoid unnessary shallow copies.
stream = CArrayStream.from_array_list(self._arrays, self._schema, move=False)
return stream.__arrow_c_stream__(requested_schema=requested_schema)

def child(self, int64_t i):
cdef CMaterializedArrayStream out = CMaterializedArrayStream()
cdef int code

out._schema = self._schema.child(i)
out._arrays = [chunk.child(i) for chunk in self._arrays]
for child_chunk in out._arrays:
out._total_length += child_chunk.length
code = ArrowBufferAppendInt64(out._array_ends._ptr, out._total_length)
Error.raise_error_not_ok("ArrowBufferAppendInt64()", code)

out._finalize()
return out

@staticmethod
def from_c_array(CArray array):
array._assert_valid()

cdef CMaterializedArrayStream out = CMaterializedArrayStream()
out._schema = array._schema

if array._ptr.length == 0:
out._finalize()
return out

out._arrays.append(array)
out._total_length += array._ptr.length
cdef int code = ArrowBufferAppendInt64(out._array_ends._ptr, out._total_length)
Error.raise_error_not_ok("ArrowBufferAppendInt64()", code)

out._finalize()
return out

@staticmethod
def from_c_array_stream(CArrayStream stream):
stream._assert_valid()
cdef CMaterializedArrayStream out = CMaterializedArrayStream()
cdef int code
cdef CArray array

with stream:
for array in stream:
if array._ptr.length == 0:
continue

out._total_length += array._ptr.length
code = ArrowBufferAppendInt64(out._array_ends._ptr, out._total_length)
Error.raise_error_not_ok("ArrowBufferAppendInt64()", code)
out._arrays.append(array)

out._schema = stream._get_cached_schema()

out._finalize()
return out


cdef class CDeviceArray:
cdef object _base
cdef ArrowDeviceArray* _ptr
Expand Down
9 changes: 9 additions & 0 deletions python/src/nanoarrow/_repr_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ def make_class_label(obj, module=None):
return f"{module}.{obj.__class__.__name__}"


def c_schema_to_string(obj, max_char_width=80):
max_char_width = max(max_char_width, 10)
c_schema_string = obj._to_string(recursive=True, max_chars=max_char_width + 1)
if len(c_schema_string) > max_char_width:
return c_schema_string[: (max_char_width - 3)] + "..."
else:
return c_schema_string


def schema_repr(schema, indent=0):
indent_str = " " * indent
class_label = make_class_label(schema, module="nanoarrow.c_lib")
Expand Down
Loading

0 comments on commit 7af6dff

Please sign in to comment.