Skip to content

Commit

Permalink
fix: Ignore empty (but present) union validity bitmaps from before 1.0 (
Browse files Browse the repository at this point in the history
#630)

For metadata versions before V5, ignore the extra validity bitmap. If
the bitmap is non-empty, we raise an error (which is the same bail used
in arrow-C++ and -java).
  • Loading branch information
bkietz authored Sep 20, 2024
1 parent 61c6917 commit 57acfbe
Showing 1 changed file with 40 additions and 8 deletions.
48 changes: 40 additions & 8 deletions src/nanoarrow/ipc/decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ struct ArrowIpcDecoderPrivate {
// The number of buffers that future RecordBatch messages must have to match the schema
// that has been set.
int64_t n_buffers;
// The number of union fields in the Schema.
int64_t n_union_fields;
// A pointer to the last flatbuffers message.
const void* last_message;
// Storage for a Footer
Expand Down Expand Up @@ -261,6 +263,8 @@ void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder) {
private_data->n_fields = 0;
}

private_data->n_union_fields = 0;

ArrowIpcFooterReset(&private_data->footer);

ArrowFree(private_data);
Expand Down Expand Up @@ -924,9 +928,16 @@ static int ArrowIpcDecoderDecodeRecordBatchHeader(struct ArrowIpcDecoder* decode
return EINVAL;
}

if ((n_buffers + 1) != private_data->n_buffers) {
int64_t n_expected_buffers = private_data->n_buffers;
if (decoder->metadata_version < NANOARROW_IPC_METADATA_VERSION_V5) {
// Unions had null buffers before arrow 1.0, so expect one extra buffer per union
// field
n_expected_buffers += private_data->n_union_fields;
}

if ((n_buffers + 1) != n_expected_buffers) {
ArrowErrorSet(error, "Expected %" PRId64 " buffers in message but found %" PRId64,
private_data->n_buffers - 1, n_buffers);
n_expected_buffers - 1, n_buffers);
return EINVAL;
}

Expand Down Expand Up @@ -1179,14 +1190,14 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,

switch (decoder->metadata_version) {
case ns(MetadataVersion_V5):
case ns(MetadataVersion_V4):
break;
case ns(MetadataVersion_V1):
case ns(MetadataVersion_V2):
case ns(MetadataVersion_V3):
case ns(MetadataVersion_V4):
ArrowErrorSet(error, "Expected metadata version V5 but found %s",
ArrowErrorSet(error, "Expected metadata version V4 or V5 but found %s",
ns(MetadataVersion_name(ns(Message_version(message)))));
break;
return EINVAL;
default:
ArrowErrorSet(error, "Unexpected value for Message metadata version (%d)",
decoder->metadata_version);
Expand Down Expand Up @@ -1307,7 +1318,7 @@ static void ArrowIpcDecoderCountFields(struct ArrowSchema* schema, int64_t* n_fi
static void ArrowIpcDecoderInitFields(struct ArrowIpcField* fields,
struct ArrowArrayView* array_view,
struct ArrowArray* array, int64_t* n_fields,
int64_t* n_buffers) {
int64_t* n_buffers, int64_t* n_union_fields) {
struct ArrowIpcField* field = fields + (*n_fields);
field->array_view = array_view;
field->array = array;
Expand All @@ -1316,12 +1327,14 @@ static void ArrowIpcDecoderInitFields(struct ArrowIpcField* fields,
for (int i = 0; i < NANOARROW_MAX_FIXED_BUFFERS; i++) {
*n_buffers += array_view->layout.buffer_type[i] != NANOARROW_BUFFER_TYPE_NONE;
}
*n_union_fields += array_view->storage_type == NANOARROW_TYPE_SPARSE_UNION ||
array_view->storage_type == NANOARROW_TYPE_DENSE_UNION;

*n_fields += 1;

for (int64_t i = 0; i < array_view->n_children; i++) {
ArrowIpcDecoderInitFields(fields, array_view->children[i], array->children[i],
n_fields, n_buffers);
n_fields, n_buffers, n_union_fields);
}
}

Expand All @@ -1334,6 +1347,7 @@ ArrowErrorCode ArrowIpcDecoderSetSchema(struct ArrowIpcDecoder* decoder,
// Reset previously allocated schema-specific resources
private_data->n_buffers = 0;
private_data->n_fields = 0;
private_data->n_union_fields = 0;
ArrowArrayViewReset(&private_data->array_view);
if (private_data->array.release != NULL) {
ArrowArrayRelease(&private_data->array);
Expand Down Expand Up @@ -1368,7 +1382,8 @@ ArrowErrorCode ArrowIpcDecoderSetSchema(struct ArrowIpcDecoder* decoder,
// Init field information and calculate starting buffer offset for each
int64_t field_i = 0;
ArrowIpcDecoderInitFields(private_data->fields, &private_data->array_view,
&private_data->array, &field_i, &private_data->n_buffers);
&private_data->array, &field_i, &private_data->n_buffers,
&private_data->n_union_fields);

return NANOARROW_OK;
}
Expand Down Expand Up @@ -1604,6 +1619,7 @@ struct ArrowIpcArraySetter {
int64_t body_size_bytes;
struct ArrowIpcBufferSource src;
struct ArrowIpcBufferFactory factory;
enum ArrowIpcMetadataVersion version;
};

static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter, int64_t offset,
Expand Down Expand Up @@ -1691,6 +1707,21 @@ static int ArrowIpcDecoderWalkSetArrayView(struct ArrowIpcArraySetter* setter,
array_view->null_count = ns(FieldNode_null_count(field));
setter->field_i += 1;

if (array_view->storage_type == NANOARROW_TYPE_SPARSE_UNION ||
array_view->storage_type == NANOARROW_TYPE_DENSE_UNION) {
if (setter->version < NANOARROW_IPC_METADATA_VERSION_V5) {
ns(Buffer_struct_t) buffer =
ns(Buffer_vec_at(setter->buffers, (size_t)setter->buffer_i));
if (ns(Buffer_length(buffer)) != 0) {
ArrowErrorSet(error,
"Cannot read pre-1.0.0 Union array with top-level validity bitmap");
return EINVAL;
}
// skip the empty validity bitmap
setter->buffer_i += 1;
}
}

for (int i = 0; i < NANOARROW_MAX_FIXED_BUFFERS; i++) {
if (array_view->layout.buffer_type[i] == NANOARROW_BUFFER_TYPE_NONE) {
break;
Expand Down Expand Up @@ -1803,6 +1834,7 @@ static ArrowErrorCode ArrowIpcDecoderDecodeArrayViewInternal(
setter.factory = factory;
setter.src.codec = decoder->codec;
setter.src.swap_endian = ArrowIpcDecoderNeedsSwapEndian(decoder);
setter.version = decoder->metadata_version;

// The flatbuffers FieldNode doesn't count the root struct so we have to loop over the
// children ourselves
Expand Down

0 comments on commit 57acfbe

Please sign in to comment.