Skip to content

Commit

Permalink
feat: add Footer decoding (#598)
Browse files Browse the repository at this point in the history
- Adds ArrowIpcDecoderPeekFooter(), ArrowIpcDecoderVerifyFooter(), and
ArrowIpcDecoderDecodeFooter()
- Uses these to read IPC files in the integration test executable
  • Loading branch information
bkietz authored Aug 30, 2024
1 parent 3298ebc commit cf38896
Show file tree
Hide file tree
Showing 5 changed files with 352 additions and 47 deletions.
66 changes: 48 additions & 18 deletions src/nanoarrow/integration/ipc_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ int main(int argc, char** argv) try {
}

struct File {
File(FILE* file) : file_{file} {}
File() = default;

~File() {
if (file_ != nullptr) {
fclose(file_);
Expand Down Expand Up @@ -166,35 +169,62 @@ struct MaterializedArrayStream {
// Footer).
File ipc_file;
NANOARROW_RETURN_NOT_OK(ipc_file.open(path, "rb", error));
return FromIpcFile(ipc_file, error);
}
auto bytes = ipc_file.read();

ArrowErrorCode FromIpcFile(FILE* ipc_file, struct ArrowError* error) {
char prefix[sizeof(NANOARROW_IPC_FILE_PADDED_MAGIC)] = {};
if (fread(&prefix, 1, sizeof(prefix), ipc_file) < sizeof(prefix)) {
ArrowErrorSet(error, "Expected file of more than %lu bytes, got %ld",
sizeof(prefix), ftell(ipc_file));
auto min_size = sizeof(NANOARROW_IPC_FILE_PADDED_MAGIC) + sizeof(int32_t) +
strlen(NANOARROW_IPC_FILE_PADDED_MAGIC);
if (bytes.size() < min_size) {
ArrowErrorSet(error, "Expected file of more than %lu bytes, got %ld", min_size,
bytes.size());
return EINVAL;
}

if (memcmp(&prefix, NANOARROW_IPC_FILE_PADDED_MAGIC, sizeof(prefix)) != 0) {
if (memcmp(bytes.data(), NANOARROW_IPC_FILE_PADDED_MAGIC,
sizeof(NANOARROW_IPC_FILE_PADDED_MAGIC)) != 0) {
ArrowErrorSet(error, "File did not begin with 'ARROW1\\0\\0'");
return EINVAL;
}

nanoarrow::ipc::UniqueInputStream input_stream;
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcInputStreamInitFile(input_stream.get(), ipc_file,
/*close_on_release=*/false),
error);
nanoarrow::ipc::UniqueDecoder decoder;
NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowIpcDecoderInit(decoder.get()), error);
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderVerifyFooter(
decoder.get(), {{bytes.data()}, static_cast<int64_t>(bytes.size())}, error));
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeFooter(
decoder.get(), {{bytes.data()}, static_cast<int64_t>(bytes.size())}, error));

nanoarrow::UniqueArrayStream array_stream;
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcArrayStreamReaderInit(array_stream.get(), input_stream.get(),
/*options=*/nullptr),
error);
ArrowSchemaDeepCopy(&decoder->footer->schema, schema.get()), error);
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderSetSchema(decoder.get(), &decoder->footer->schema, error));
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcDecoderSetEndianness(decoder.get(), decoder->endianness), error);

nanoarrow::UniqueBuffer record_batch_blocks;
ArrowBufferMove(&decoder->footer->record_batch_blocks, record_batch_blocks.get());

for (int i = 0;
i < record_batch_blocks->size_bytes / sizeof(struct ArrowIpcFileBlock); i++) {
const auto& block =
reinterpret_cast<struct ArrowIpcFileBlock*>(record_batch_blocks->data)[i];
struct ArrowBufferView metadata_view = {
{bytes.data() + block.offset},
block.metadata_length,
};
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderDecodeHeader(decoder.get(), metadata_view, error));

return From(array_stream.get(), error);
struct ArrowBufferView body_view = {
{metadata_view.data.as_uint8 + metadata_view.size_bytes},
block.body_length,
};
nanoarrow::UniqueArray batch;
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderDecodeArray(decoder.get(), body_view, -1, batch.get(),
NANOARROW_VALIDATION_LEVEL_FULL, error));
batches.push_back(std::move(batch));
}

return NANOARROW_OK;
}

ArrowErrorCode Write(struct ArrowIpcOutputStream* output_stream, bool write_file,
Expand Down
167 changes: 144 additions & 23 deletions src/nanoarrow/ipc/decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
// at the beginning of every message header.
static const int32_t kMessageHeaderPrefixSize = 8;

#define NANOARROW_IPC_MAGIC "ARROW1"

// Internal representation of a parsed "Field" from flatbuffers. This
// represents a field in a depth-first walk of column arrays and their
// children.
Expand Down Expand Up @@ -95,6 +97,8 @@ struct ArrowIpcDecoderPrivate {
int64_t n_buffers;
// A pointer to the last flatbuffers message.
const void* last_message;
// Storage for a Footer
struct ArrowIpcFooter footer;
};

ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error) {
Expand Down Expand Up @@ -236,6 +240,7 @@ ArrowErrorCode ArrowIpcDecoderInit(struct ArrowIpcDecoder* decoder) {

memset(private_data, 0, sizeof(struct ArrowIpcDecoderPrivate));
private_data->system_endianness = ArrowIpcSystemEndianness();
ArrowIpcFooterInit(&private_data->footer);
decoder->private_data = private_data;
return NANOARROW_OK;
}
Expand All @@ -256,6 +261,8 @@ void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder) {
private_data->n_fields = 0;
}

ArrowIpcFooterReset(&private_data->footer);

ArrowFree(private_data);
memset(decoder, 0, sizeof(struct ArrowIpcDecoder));
}
Expand Down Expand Up @@ -959,6 +966,8 @@ static inline void ArrowIpcDecoderResetHeaderInfo(struct ArrowIpcDecoder* decode
decoder->codec = 0;
decoder->header_size_bytes = 0;
decoder->body_size_bytes = 0;
decoder->footer = NULL;
ArrowIpcFooterReset(&private_data->footer);
private_data->last_message = NULL;
}

Expand Down Expand Up @@ -1053,6 +1062,85 @@ ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct ArrowIpcDecoder* decoder,
return NANOARROW_OK;
}

ArrowErrorCode ArrowIpcDecoderPeekFooter(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView data,
struct ArrowError* error) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;

ArrowIpcDecoderResetHeaderInfo(decoder);
if (data.size_bytes < (int)strlen(NANOARROW_IPC_MAGIC) + (int)sizeof(int32_t)) {
ArrowErrorSet(error,
"Expected data of at least 10 bytes but only %" PRId64
" bytes are available",
data.size_bytes);
return ESPIPE;
}

const char* data_end = data.data.as_char + data.size_bytes;
const char* magic = data_end - strlen(NANOARROW_IPC_MAGIC);
const char* footer_size_data = magic - sizeof(int32_t);

if (memcmp(magic, NANOARROW_IPC_MAGIC, strlen(NANOARROW_IPC_MAGIC)) != 0) {
ArrowErrorSet(error, "Expected file to end with ARROW1 but got %s", data_end);
return EINVAL;
}

int32_t footer_size;
memcpy(&footer_size, footer_size_data, sizeof(footer_size));
if (private_data->system_endianness == NANOARROW_IPC_ENDIANNESS_BIG) {
footer_size = bswap32(footer_size);
}

if (footer_size < 0) {
ArrowErrorSet(error, "Expected footer size > 0 but found footer size of %d bytes",
footer_size);
return EINVAL;
}

decoder->header_size_bytes = footer_size;
return NANOARROW_OK;
}

ArrowErrorCode ArrowIpcDecoderVerifyFooter(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView data,
struct ArrowError* error) {
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekFooter(decoder, data, error));

// Check that data contains at least the entire footer (return ESPIPE to signal
// that reading more data may help).
int32_t footer_and_size_and_magic_size =
decoder->header_size_bytes + sizeof(int32_t) + strlen(NANOARROW_IPC_MAGIC);
if (data.size_bytes < footer_and_size_and_magic_size) {
ArrowErrorSet(error,
"Expected >= %d bytes of data but only %" PRId64
" bytes are in the buffer",
footer_and_size_and_magic_size, data.size_bytes);
return ESPIPE;
}

const uint8_t* footer_data =
data.data.as_uint8 + data.size_bytes - footer_and_size_and_magic_size;

// Run flatbuffers verification
if (ns(Footer_verify_as_root(footer_data, decoder->header_size_bytes) !=
flatcc_verify_ok)) {
ArrowErrorSet(error, "Footer flatbuffer verification failed");
return EINVAL;
}

// Read some basic information from the message
ns(Footer_table_t) footer = ns(Footer_as_root(footer_data));
if (ns(Footer_schema(footer)) == NULL) {
ArrowErrorSet(error, "Footer has no schema");
return EINVAL;
}

decoder->metadata_version = ns(Footer_version(footer));
decoder->body_size_bytes = 0;
return NANOARROW_OK;
}

ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView data,
struct ArrowError* error) {
Expand Down Expand Up @@ -1126,6 +1214,29 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
return NANOARROW_OK;
}

static ArrowErrorCode ArrowIpcDecoderDecodeSchemaImpl(ns(Schema_table_t) schema,
struct ArrowSchema* out,
struct ArrowError* error) {
ArrowSchemaInit(out);
// Top-level batch schema is typically non-nullable
out->flags = 0;

ns(Field_vec_t) fields = ns(Schema_fields(schema));
int64_t n_fields = ns(Schema_vec_len(fields));

ArrowErrorCode result = ArrowSchemaSetTypeStruct(out, n_fields);
if (result != NANOARROW_OK) {
ArrowErrorSet(error, "Failed to allocate struct schema with %" PRId64 " children",
n_fields);
return result;
}

NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetChildren(out, fields, error));
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderSetMetadata(out, ns(Schema_custom_metadata(schema)), error));
return NANOARROW_OK;
}

ArrowErrorCode ArrowIpcDecoderDecodeSchema(struct ArrowIpcDecoder* decoder,
struct ArrowSchema* out,
struct ArrowError* error) {
Expand All @@ -1138,37 +1249,47 @@ ArrowErrorCode ArrowIpcDecoderDecodeSchema(struct ArrowIpcDecoder* decoder,
return EINVAL;
}

ns(Schema_table_t) schema = (ns(Schema_table_t))private_data->last_message;

ns(Field_vec_t) fields = ns(Schema_fields(schema));
int64_t n_fields = ns(Schema_vec_len(fields));

struct ArrowSchema tmp;
ArrowSchemaInit(&tmp);
int result = ArrowSchemaSetTypeStruct(&tmp, n_fields);
if (result != NANOARROW_OK) {
ArrowSchemaRelease(&tmp);
ArrowErrorSet(error, "Failed to allocate struct schema with %" PRId64 " children",
n_fields);
return result;
}

// Top-level batch schema is typically non-nullable
tmp.flags = 0;
ArrowErrorCode result = ArrowIpcDecoderDecodeSchemaImpl(
(ns(Schema_table_t))private_data->last_message, &tmp, error);

result = ArrowIpcDecoderSetChildren(&tmp, fields, error);
if (result != NANOARROW_OK) {
ArrowSchemaRelease(&tmp);
return result;
}
ArrowSchemaMove(&tmp, out);
return NANOARROW_OK;
}

result = ArrowIpcDecoderSetMetadata(&tmp, ns(Schema_custom_metadata(schema)), error);
if (result != NANOARROW_OK) {
ArrowSchemaRelease(&tmp);
return result;
}
ArrowErrorCode ArrowIpcDecoderDecodeFooter(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView data,
struct ArrowError* error) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;

ArrowSchemaMove(&tmp, out);
int32_t footer_and_size_and_magic_size =
decoder->header_size_bytes + sizeof(int32_t) + strlen(NANOARROW_IPC_MAGIC);
const uint8_t* footer_data =
data.data.as_uint8 + data.size_bytes - footer_and_size_and_magic_size;
ns(Footer_table_t) footer = ns(Footer_as_root(footer_data));

NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeSchemaImpl(
ns(Footer_schema(footer)), &private_data->footer.schema, error));

ns(Block_vec_t) blocks = ns(Footer_recordBatches(footer));
int64_t n = ns(Block_vec_len(blocks));
NANOARROW_RETURN_NOT_OK(ArrowBufferResize(&private_data->footer.record_batch_blocks,
sizeof(struct ArrowIpcFileBlock) * n,
/*shrink_to_fit=*/0));
struct ArrowIpcFileBlock* record_batches =
(struct ArrowIpcFileBlock*)private_data->footer.record_batch_blocks.data;
for (int64_t i = 0; i < n; i++) {
record_batches[i].offset = blocks[i].offset;
record_batches[i].metadata_length = blocks[i].metaDataLength;
record_batches[i].body_length = blocks[i].bodyLength;
}

decoder->footer = &private_data->footer;
return NANOARROW_OK;
}

Expand Down
Loading

0 comments on commit cf38896

Please sign in to comment.