Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add IPC writer scaffolding #564

Merged
merged 3 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 27 additions & 40 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,9 @@ if(NANOARROW_IPC)
endif()

if(NOT NANOARROW_BUNDLE)
set(NANOARROW_IPC_BUILD_SOURCES src/nanoarrow/ipc/decoder.c
src/nanoarrow/ipc/reader.c)
set(NANOARROW_IPC_BUILD_SOURCES
src/nanoarrow/ipc/decoder.c src/nanoarrow/ipc/encoder.c
src/nanoarrow/ipc/reader.c)
endif()

add_library(nanoarrow_ipc ${NANOARROW_IPC_BUILD_SOURCES})
Expand Down Expand Up @@ -418,51 +419,37 @@ if(NANOARROW_BUILD_TESTS)
endif()

enable_testing()

add_executable(nanoarrow_ipc_decoder_test src/nanoarrow/ipc/decoder_test.cc)
add_executable(nanoarrow_ipc_reader_test src/nanoarrow/ipc/reader_test.cc)
add_executable(nanoarrow_ipc_files_test src/nanoarrow/ipc/files_test.cc)
add_executable(nanoarrow_ipc_hpp_test src/nanoarrow/ipc/ipc_hpp_test.cc)
include(GoogleTest)

if(NANOARROW_CODE_COVERAGE)
target_compile_options(ipc_coverage_config INTERFACE -O0 -g --coverage)
target_link_options(ipc_coverage_config INTERFACE --coverage)
target_link_libraries(nanoarrow_ipc PRIVATE ipc_coverage_config)
endif()
target_link_libraries(nanoarrow_ipc_decoder_test
nanoarrow_ipc
nanoarrow
flatccrt
${NANOARROW_ARROW_TARGET}
gtest_main
ipc_coverage_config)
target_link_libraries(nanoarrow_ipc_reader_test
nanoarrow_ipc
nanoarrow
flatccrt
gtest_main
ipc_coverage_config)
target_link_libraries(nanoarrow_ipc_files_test
nanoarrow_ipc
nanoarrow
flatccrt
${NANOARROW_ARROW_TARGET}
nlohmann_json
ZLIB::ZLIB
gtest_main
ipc_coverage_config)
target_link_libraries(nanoarrow_ipc_hpp_test
nanoarrow_ipc
nanoarrow
${NANOARROW_ARROW_TARGET}
gtest_main
ipc_coverage_config)

include(GoogleTest)
gtest_discover_tests(nanoarrow_ipc_decoder_test)
gtest_discover_tests(nanoarrow_ipc_reader_test)
gtest_discover_tests(nanoarrow_ipc_files_test)
gtest_discover_tests(nanoarrow_ipc_hpp_test)
foreach(name
decoder
encoder
reader
files
ipc_hpp)
add_executable(nanoarrow_ipc_${name}_test src/nanoarrow/ipc/${name}_test.cc)

target_link_libraries(nanoarrow_ipc_${name}_test
nanoarrow_ipc
nanoarrow
${NANOARROW_ARROW_TARGET}
gtest_main
ipc_coverage_config)

if(NOT (name MATCHES "_hpp_"))
target_link_libraries(nanoarrow_ipc_${name}_test flatccrt)
endif()

gtest_discover_tests(nanoarrow_ipc_${name}_test)
endforeach()

target_link_libraries(nanoarrow_ipc_files_test nlohmann_json ZLIB::ZLIB)
endif()

if(NANOARROW_DEVICE)
Expand Down
11 changes: 0 additions & 11 deletions src/nanoarrow/ipc/decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,6 @@ ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error) {
return NANOARROW_OK;
}

static enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) {
uint32_t check = 1;
char first_byte;
memcpy(&first_byte, &check, sizeof(char));
if (first_byte) {
return NANOARROW_IPC_ENDIANNESS_LITTLE;
} else {
return NANOARROW_IPC_ENDIANNESS_BIG;
}
}

#if NANOARROW_IPC_USE_STDATOMIC
struct ArrowIpcSharedBufferPrivate {
struct ArrowBuffer src;
Expand Down
14 changes: 1 addition & 13 deletions src/nanoarrow/ipc/decoder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@

using namespace arrow;

// Copied from nanoarrow_ipc.c so we can test the internal state
// of the decoder
// Copied from decoder.c so we can test the internal state
extern "C" {
struct ArrowIpcField {
struct ArrowArrayView* array_view;
Expand All @@ -51,17 +50,6 @@ struct ArrowIpcDecoderPrivate {
};
}

static enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) {
uint32_t check = 1;
char first_byte;
memcpy(&first_byte, &check, sizeof(char));
if (first_byte) {
return NANOARROW_IPC_ENDIANNESS_LITTLE;
} else {
return NANOARROW_IPC_ENDIANNESS_BIG;
}
}

TEST(NanoarrowIpcCheckRuntime, CheckRuntime) {
EXPECT_EQ(ArrowIpcCheckRuntime(nullptr), NANOARROW_OK);
}
Expand Down
108 changes: 108 additions & 0 deletions src/nanoarrow/ipc/encoder.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <errno.h>
#include <inttypes.h>
#include <stdio.h>
#include <string.h>

// For thread safe shared buffers we need C11 + stdatomic.h
// Can compile with -DNANOARROW_IPC_USE_STDATOMIC=0 or 1 to override
// automatic detection
#if !defined(NANOARROW_IPC_USE_STDATOMIC)
#define NANOARROW_IPC_USE_STDATOMIC 0

// Check for C11
#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L

// Check for GCC 4.8, which doesn't include stdatomic.h but does
// not define __STDC_NO_ATOMICS__
#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ >= 5

#if !defined(__STDC_NO_ATOMICS__)
#include <stdatomic.h>
#undef NANOARROW_IPC_USE_STDATOMIC
#define NANOARROW_IPC_USE_STDATOMIC 1
#endif
#endif
#endif

#endif
bkietz marked this conversation as resolved.
Show resolved Hide resolved

#include "nanoarrow/ipc/flatcc_generated.h"
#include "nanoarrow/nanoarrow.h"
#include "nanoarrow/nanoarrow_ipc.h"

// R 3.6 / Windows builds on a very old toolchain that does not define ENODATA
#if defined(_WIN32) && !defined(_MSC_VER) && !defined(ENODATA)
#define ENODATA 120
#endif
bkietz marked this conversation as resolved.
Show resolved Hide resolved

#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x)

#define FLATCC_RETURN_UNLESS_0(x) \
if (ns(x) != 0) return ENOMEM;

struct ArrowIpcEncoderPrivate {
flatcc_builder_t builder;
struct ArrowBuffer buffers, nodes;
bkietz marked this conversation as resolved.
Show resolved Hide resolved
};

ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) {
NANOARROW_DCHECK(encoder);
bkietz marked this conversation as resolved.
Show resolved Hide resolved
memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
encoder->encode_buffer = NULL;
encoder->encode_buffer_state = NULL;
encoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
encoder->private_data = ArrowMalloc(sizeof(struct ArrowIpcEncoderPrivate));
struct ArrowIpcEncoderPrivate* private =
(struct ArrowIpcEncoderPrivate*)encoder->private_data;
ArrowBufferInit(&private->buffers);
ArrowBufferInit(&private->nodes);
if (flatcc_builder_init(&private->builder) == -1) {
return ESPIPE;
bkietz marked this conversation as resolved.
Show resolved Hide resolved
}
return NANOARROW_OK;
}

void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) {
NANOARROW_DCHECK(encoder && encoder->private_data);
bkietz marked this conversation as resolved.
Show resolved Hide resolved
struct ArrowIpcEncoderPrivate* private =
(struct ArrowIpcEncoderPrivate*)encoder->private_data;
flatcc_builder_clear(&private->builder);
ArrowBufferReset(&private->nodes);
ArrowBufferReset(&private->buffers);
ArrowFree(private);
memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
}

ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
struct ArrowBuffer* out) {
NANOARROW_DCHECK(encoder && encoder->private_data && out);
bkietz marked this conversation as resolved.
Show resolved Hide resolved
struct ArrowIpcEncoderPrivate* private =
(struct ArrowIpcEncoderPrivate*)encoder->private_data;
ArrowBufferReset(out);
bkietz marked this conversation as resolved.
Show resolved Hide resolved
size_t size = flatcc_builder_get_buffer_size(&private->builder);
if (size == 0) {
// Finalizing an empty flatcc_builder_t triggers an assertion
return NANOARROW_OK;
}

out->size_bytes = out->capacity_bytes = (int64_t)size;
out->data = (uint8_t*)flatcc_builder_finalize_buffer(&private->builder, &size);
return out->data ? NANOARROW_OK : ENOMEM;
bkietz marked this conversation as resolved.
Show resolved Hide resolved
}
69 changes: 69 additions & 0 deletions src/nanoarrow/ipc/encoder_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <arrow/array.h>
#include <arrow/c/bridge.h>
#include <arrow/compute/api.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/api.h>
#include <arrow/util/key_value_metadata.h>
bkietz marked this conversation as resolved.
Show resolved Hide resolved
bkietz marked this conversation as resolved.
Show resolved Hide resolved
#include <gtest/gtest.h>

#include "flatcc/flatcc_builder.h"
#include "nanoarrow/nanoarrow.hpp"
#include "nanoarrow/nanoarrow_ipc.hpp"

using namespace arrow;

// Copied from encoder.c so we can test the internal state
extern "C" {
struct ArrowIpcEncoderPrivate {
flatcc_builder_t builder;
struct ArrowBuffer buffers, nodes;
};
}

TEST(NanoarrowIpcTest, NanoarrowIpcEncoderConstruction) {
nanoarrow::ipc::UniqueEncoder encoder;

EXPECT_EQ(ArrowIpcEncoderInit(encoder.get()), NANOARROW_OK);

EXPECT_EQ(encoder->codec, NANOARROW_IPC_COMPRESSION_TYPE_NONE);
EXPECT_EQ(encoder->body_length, 0);
EXPECT_EQ(encoder->encode_buffer, nullptr);
EXPECT_EQ(encoder->encode_buffer_state, nullptr);

auto* priv = static_cast<struct ArrowIpcEncoderPrivate*>(encoder->private_data);
ASSERT_NE(priv, nullptr);
for (auto* b : {&priv->buffers, &priv->nodes}) {
// Buffers are empty but initialized with the default allocator
EXPECT_EQ(b->size_bytes, 0);

auto default_allocator = ArrowBufferAllocatorDefault();
EXPECT_EQ(memcmp(&b->allocator, &default_allocator, sizeof(b->allocator)), 0);
}

// Empty buffer works
nanoarrow::UniqueBuffer buffer;
EXPECT_EQ(ArrowIpcEncoderFinalizeBuffer(encoder.get(), buffer.get()), NANOARROW_OK);
EXPECT_EQ(buffer->size_bytes, 0);

// Append a string (finalizing an empty buffer is an error for flatcc_builder_t)
EXPECT_NE(flatcc_builder_create_string_str(&priv->builder, "hello world"), 0);
EXPECT_EQ(ArrowIpcEncoderFinalizeBuffer(encoder.get(), buffer.get()), NANOARROW_OK);
EXPECT_GT(buffer->size_bytes, sizeof("hello world"));
}
54 changes: 54 additions & 0 deletions src/nanoarrow/nanoarrow_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamMove)
#define ArrowIpcArrayStreamReaderInit \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcArrayStreamReaderInit)
#define ArrowIpcEncoderInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderInit)
#define ArrowIpcEncoderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderReset)
#define ArrowIpcEncoderFinalizeBuffer \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderFinalizeBuffer)

#endif

Expand Down Expand Up @@ -117,6 +121,18 @@ enum ArrowIpcCompressionType {
/// \brief Checks the nanoarrow runtime to make sure the run/build versions match
ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error);

/// \brief Get the endianness of the current runtime
static inline enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) {
uint32_t check = 1;
char first_byte;
memcpy(&first_byte, &check, sizeof(char));
if (first_byte) {
return NANOARROW_IPC_ENDIANNESS_LITTLE;
} else {
return NANOARROW_IPC_ENDIANNESS_BIG;
}
}

/// \brief A structure representing a reference-counted buffer that may be passed to
/// ArrowIpcDecoderDecodeArrayFromShared().
struct ArrowIpcSharedBuffer {
Expand Down Expand Up @@ -379,6 +395,44 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit(
struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream,
struct ArrowIpcArrayStreamReaderOptions* options);

/// \brief Encoder for Arrow IPC messages
///
/// This structure is intended to be allocated by the caller,
/// initialized using ArrowIpcEncoderInit(), and released with
/// ArrowIpcEncoderReset().
struct ArrowIpcEncoder {
/// \brief Compression to encode in the next RecordBatch message
enum ArrowIpcCompressionType codec;

/// \brief Finalized body length of the most recently encoded RecordBatch message
int64_t body_length;

/// \brief Callback invoked against each buffer to be encoded.
///
/// Encoding of buffers is left as a callback to accommodate dissociated data storage.
/// One implementation of this callback might copy all buffers into a contiguous body
/// for use in an arrow IPC stream, another implementation might store offsets and
/// lengths relative to a known arena.
ArrowErrorCode (*encode_buffer)(struct ArrowBufferView buffer_view,
struct ArrowIpcEncoder* encoder, int64_t* offset,
int64_t* length, struct ArrowError* error);

/// \brief Pointer to arbitrary data used by encode_buffer()
void* encode_buffer_state;

/// \brief Private resources managed by this library
void* private_data;
};

/// \brief Initialize an encoder
bkietz marked this conversation as resolved.
Show resolved Hide resolved
ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder);

/// \brief Release all resources attached to an encoder
void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder);

/// \brief Finalize the most recently encoded message to a buffer
bkietz marked this conversation as resolved.
Show resolved Hide resolved
ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
struct ArrowBuffer* out);
/// @}

#ifdef __cplusplus
Expand Down
Loading
Loading