Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Jul 24, 2024
1 parent 45cbf82 commit 834a2f1
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 26 deletions.
1 change: 1 addition & 0 deletions ci/scripts/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def bundle_nanoarrow_ipc(
[
src_dir / "ipc" / "flatcc_generated.h",
src_dir / "ipc" / "decoder.c",
src_dir / "ipc" / "encoder.c",
src_dir / "ipc" / "reader.c",
]
)
Expand Down
33 changes: 21 additions & 12 deletions src/nanoarrow/ipc/encoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,28 +59,30 @@

struct ArrowIpcEncoderPrivate {
flatcc_builder_t builder;
struct ArrowBuffer buffers, nodes;
struct ArrowBuffer buffers;
struct ArrowBuffer nodes;
};

ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) {
NANOARROW_DCHECK(encoder);
NANOARROW_DCHECK(encoder != NULL);
memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
encoder->encode_buffer = NULL;
encoder->encode_buffer_state = NULL;
encoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
encoder->private_data = ArrowMalloc(sizeof(struct ArrowIpcEncoderPrivate));
struct ArrowIpcEncoderPrivate* private =
(struct ArrowIpcEncoderPrivate*)encoder->private_data;
ArrowBufferInit(&private->buffers);
ArrowBufferInit(&private->nodes);
if (flatcc_builder_init(&private->builder) == -1) {
ArrowFree(private);
return ESPIPE;
}
ArrowBufferInit(&private->buffers);
ArrowBufferInit(&private->nodes);
return NANOARROW_OK;
}

void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) {
NANOARROW_DCHECK(encoder && encoder->private_data);
NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL);
struct ArrowIpcEncoderPrivate* private =
(struct ArrowIpcEncoderPrivate*)encoder->private_data;
flatcc_builder_clear(&private->builder);
Expand All @@ -92,17 +94,24 @@ void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) {

ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
struct ArrowBuffer* out) {
NANOARROW_DCHECK(encoder && encoder->private_data && out);
NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && out != NULL);
struct ArrowIpcEncoderPrivate* private =
(struct ArrowIpcEncoderPrivate*)encoder->private_data;
ArrowBufferReset(out);
size_t size = flatcc_builder_get_buffer_size(&private->builder);

int64_t size = (int64_t)flatcc_builder_get_buffer_size(&private->builder);
if (size == 0) {
// Finalizing an empty flatcc_builder_t triggers an assertion
return NANOARROW_OK;
return ArrowBufferResize(out, 0, 0);
}

void* data = flatcc_builder_get_direct_buffer(&private->builder, NULL);
if (data == NULL) {
return ENOMEM;
}
NANOARROW_RETURN_NOT_OK(ArrowBufferResize(out, size, 0));
memcpy(out->data, data, size);

out->size_bytes = out->capacity_bytes = (int64_t)size;
out->data = (uint8_t*)flatcc_builder_finalize_buffer(&private->builder, &size);
return out->data ? NANOARROW_OK : ENOMEM;
// don't deallocate yet, just wipe the builder's current Message
flatcc_builder_reset(&private->builder);
return NANOARROW_OK;
}
11 changes: 2 additions & 9 deletions src/nanoarrow/ipc/encoder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,18 @@
// specific language governing permissions and limitations
// under the License.

#include <arrow/array.h>
#include <arrow/c/bridge.h>
#include <arrow/compute/api.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/api.h>
#include <arrow/util/key_value_metadata.h>
#include <gtest/gtest.h>

#include "flatcc/flatcc_builder.h"
#include "nanoarrow/nanoarrow.hpp"
#include "nanoarrow/nanoarrow_ipc.hpp"

using namespace arrow;

// Copied from encoder.c so we can test the internal state
extern "C" {
struct ArrowIpcEncoderPrivate {
flatcc_builder_t builder;
struct ArrowBuffer buffers, nodes;
struct ArrowBuffer buffers;
struct ArrowBuffer nodes;
};
}

Expand Down
18 changes: 13 additions & 5 deletions src/nanoarrow/nanoarrow_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -401,13 +401,10 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit(
/// initialized using ArrowIpcEncoderInit(), and released with
/// ArrowIpcEncoderReset().
struct ArrowIpcEncoder {
/// \brief Compression to encode in the next RecordBatch message
/// \brief Compression to encode in the next RecordBatch message.
enum ArrowIpcCompressionType codec;

/// \brief Finalized body length of the most recently encoded RecordBatch message
int64_t body_length;

/// \brief Callback invoked against each buffer to be encoded.
/// \brief Callback invoked against each buffer to be encoded
///
/// Encoding of buffers is left as a callback to accommodate dissociated data storage.
/// One implementation of this callback might copy all buffers into a contiguous body
Expand All @@ -420,17 +417,28 @@ struct ArrowIpcEncoder {
/// \brief Pointer to arbitrary data used by encode_buffer()
void* encode_buffer_state;

/// \brief Finalized body length of the most recently encoded RecordBatch message
///
/// (This is initially 0 and encode_buffer() is expected to update it. After all
/// buffers are encoded, this will be written to the RecordBatch's .bodyLength)
int64_t body_length;

/// \brief Private resources managed by this library
void* private_data;
};

/// \brief Initialize an encoder
///
/// If NANOARROW_OK is returned, the caller must call ArrowIpcEncoderReset()
/// to release resources allocated by this function.
ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder);

/// \brief Release all resources attached to an encoder
void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder);

/// \brief Finalize the most recently encoded message to a buffer
///
/// The output buffer will be resized and the encoded message copied in.
ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
struct ArrowBuffer* out);
/// @}
Expand Down

0 comments on commit 834a2f1

Please sign in to comment.