From 834a2f1585e3a59d43217584e21b2a796e211956 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 24 Jul 2024 13:08:13 -0500 Subject: [PATCH] review comments --- ci/scripts/bundle.py | 1 + src/nanoarrow/ipc/encoder.c | 33 ++++++++++++++++++++----------- src/nanoarrow/ipc/encoder_test.cc | 11 ++--------- src/nanoarrow/nanoarrow_ipc.h | 18 ++++++++++++----- 4 files changed, 37 insertions(+), 26 deletions(-) diff --git a/ci/scripts/bundle.py b/ci/scripts/bundle.py index f152389d2..7ee09437f 100644 --- a/ci/scripts/bundle.py +++ b/ci/scripts/bundle.py @@ -203,6 +203,7 @@ def bundle_nanoarrow_ipc( [ src_dir / "ipc" / "flatcc_generated.h", src_dir / "ipc" / "decoder.c", + src_dir / "ipc" / "encoder.c", src_dir / "ipc" / "reader.c", ] ) diff --git a/src/nanoarrow/ipc/encoder.c b/src/nanoarrow/ipc/encoder.c index c093b21b3..0d972e3a9 100644 --- a/src/nanoarrow/ipc/encoder.c +++ b/src/nanoarrow/ipc/encoder.c @@ -59,11 +59,12 @@ struct ArrowIpcEncoderPrivate { flatcc_builder_t builder; - struct ArrowBuffer buffers, nodes; + struct ArrowBuffer buffers; + struct ArrowBuffer nodes; }; ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) { - NANOARROW_DCHECK(encoder); + NANOARROW_DCHECK(encoder != NULL); memset(encoder, 0, sizeof(struct ArrowIpcEncoder)); encoder->encode_buffer = NULL; encoder->encode_buffer_state = NULL; @@ -71,16 +72,17 @@ ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) { encoder->private_data = ArrowMalloc(sizeof(struct ArrowIpcEncoderPrivate)); struct ArrowIpcEncoderPrivate* private = (struct ArrowIpcEncoderPrivate*)encoder->private_data; - ArrowBufferInit(&private->buffers); - ArrowBufferInit(&private->nodes); if (flatcc_builder_init(&private->builder) == -1) { + ArrowFree(private); return ESPIPE; } + ArrowBufferInit(&private->buffers); + ArrowBufferInit(&private->nodes); return NANOARROW_OK; } void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) { - NANOARROW_DCHECK(encoder && encoder->private_data); + NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL); struct ArrowIpcEncoderPrivate* private = (struct ArrowIpcEncoderPrivate*)encoder->private_data; flatcc_builder_clear(&private->builder); @@ -92,17 +94,24 @@ void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) { ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder, struct ArrowBuffer* out) { - NANOARROW_DCHECK(encoder && encoder->private_data && out); + NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && out != NULL); struct ArrowIpcEncoderPrivate* private = (struct ArrowIpcEncoderPrivate*)encoder->private_data; - ArrowBufferReset(out); - size_t size = flatcc_builder_get_buffer_size(&private->builder); + + int64_t size = (int64_t)flatcc_builder_get_buffer_size(&private->builder); if (size == 0) { // Finalizing an empty flatcc_builder_t triggers an assertion - return NANOARROW_OK; + return ArrowBufferResize(out, 0, 0); + } + + void* data = flatcc_builder_get_direct_buffer(&private->builder, NULL); + if (data == NULL) { + return ENOMEM; } + NANOARROW_RETURN_NOT_OK(ArrowBufferResize(out, size, 0)); + memcpy(out->data, data, size); - out->size_bytes = out->capacity_bytes = (int64_t)size; - out->data = (uint8_t*)flatcc_builder_finalize_buffer(&private->builder, &size); - return out->data ? NANOARROW_OK : ENOMEM; + // don't deallocate yet, just wipe the builder's current Message + flatcc_builder_reset(&private->builder); + return NANOARROW_OK; } diff --git a/src/nanoarrow/ipc/encoder_test.cc b/src/nanoarrow/ipc/encoder_test.cc index 036d4230f..8f78b0631 100644 --- a/src/nanoarrow/ipc/encoder_test.cc +++ b/src/nanoarrow/ipc/encoder_test.cc @@ -15,25 +15,18 @@ // specific language governing permissions and limitations // under the License. -#include -#include -#include -#include -#include -#include #include #include "flatcc/flatcc_builder.h" #include "nanoarrow/nanoarrow.hpp" #include "nanoarrow/nanoarrow_ipc.hpp" -using namespace arrow; - // Copied from encoder.c so we can test the internal state extern "C" { struct ArrowIpcEncoderPrivate { flatcc_builder_t builder; - struct ArrowBuffer buffers, nodes; + struct ArrowBuffer buffers; + struct ArrowBuffer nodes; }; } diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h index 560fc5f0d..e20f87f14 100644 --- a/src/nanoarrow/nanoarrow_ipc.h +++ b/src/nanoarrow/nanoarrow_ipc.h @@ -401,13 +401,10 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit( /// initialized using ArrowIpcEncoderInit(), and released with /// ArrowIpcEncoderReset(). struct ArrowIpcEncoder { - /// \brief Compression to encode in the next RecordBatch message + /// \brief Compression to encode in the next RecordBatch message. enum ArrowIpcCompressionType codec; - /// \brief Finalized body length of the most recently encoded RecordBatch message - int64_t body_length; - - /// \brief Callback invoked against each buffer to be encoded. + /// \brief Callback invoked against each buffer to be encoded /// /// Encoding of buffers is left as a callback to accommodate dissociated data storage. /// One implementation of this callback might copy all buffers into a contiguous body @@ -420,17 +417,28 @@ struct ArrowIpcEncoder { /// \brief Pointer to arbitrary data used by encode_buffer() void* encode_buffer_state; + /// \brief Finalized body length of the most recently encoded RecordBatch message + /// + /// (This is initially 0 and encode_buffer() is expected to update it. After all + /// buffers are encoded, this will be written to the RecordBatch's .bodyLength) + int64_t body_length; + /// \brief Private resources managed by this library void* private_data; }; /// \brief Initialize an encoder +/// +/// If NANOARROW_OK is returned, the caller must call ArrowIpcEncoderReset() +/// to release resources allocated by this function. ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder); /// \brief Release all resources attached to an encoder void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder); /// \brief Finalize the most recently encoded message to a buffer +/// +/// The output buffer will be resized and the encoded message copied in. ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder, struct ArrowBuffer* out); /// @}