From 57acfbe361d81c4b57ccef28c4f0a6e28195b75d Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 20 Sep 2024 16:22:49 -0500 Subject: [PATCH] fix: Ignore empty (but present) union validity bitmaps from before 1.0 (#630) For metadata versions before V5, ignore the extra validity bitmap. If the bitmap is non-empty, we raise an error (which is the same bail used in arrow-C++ and -java). --- src/nanoarrow/ipc/decoder.c | 48 ++++++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 8 deletions(-) diff --git a/src/nanoarrow/ipc/decoder.c b/src/nanoarrow/ipc/decoder.c index c6eb2d032..5be3794e3 100644 --- a/src/nanoarrow/ipc/decoder.c +++ b/src/nanoarrow/ipc/decoder.c @@ -95,6 +95,8 @@ struct ArrowIpcDecoderPrivate { // The number of buffers that future RecordBatch messages must have to match the schema // that has been set. int64_t n_buffers; + // The number of union fields in the Schema. + int64_t n_union_fields; // A pointer to the last flatbuffers message. const void* last_message; // Storage for a Footer @@ -261,6 +263,8 @@ void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder) { private_data->n_fields = 0; } + private_data->n_union_fields = 0; + ArrowIpcFooterReset(&private_data->footer); ArrowFree(private_data); @@ -924,9 +928,16 @@ static int ArrowIpcDecoderDecodeRecordBatchHeader(struct ArrowIpcDecoder* decode return EINVAL; } - if ((n_buffers + 1) != private_data->n_buffers) { + int64_t n_expected_buffers = private_data->n_buffers; + if (decoder->metadata_version < NANOARROW_IPC_METADATA_VERSION_V5) { + // Unions had null buffers before arrow 1.0, so expect one extra buffer per union + // field + n_expected_buffers += private_data->n_union_fields; + } + + if ((n_buffers + 1) != n_expected_buffers) { ArrowErrorSet(error, "Expected %" PRId64 " buffers in message but found %" PRId64, - private_data->n_buffers - 1, n_buffers); + n_expected_buffers - 1, n_buffers); return EINVAL; } @@ -1179,14 +1190,14 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder, switch (decoder->metadata_version) { case ns(MetadataVersion_V5): + case ns(MetadataVersion_V4): break; case ns(MetadataVersion_V1): case ns(MetadataVersion_V2): case ns(MetadataVersion_V3): - case ns(MetadataVersion_V4): - ArrowErrorSet(error, "Expected metadata version V5 but found %s", + ArrowErrorSet(error, "Expected metadata version V4 or V5 but found %s", ns(MetadataVersion_name(ns(Message_version(message))))); - break; + return EINVAL; default: ArrowErrorSet(error, "Unexpected value for Message metadata version (%d)", decoder->metadata_version); @@ -1307,7 +1318,7 @@ static void ArrowIpcDecoderCountFields(struct ArrowSchema* schema, int64_t* n_fi static void ArrowIpcDecoderInitFields(struct ArrowIpcField* fields, struct ArrowArrayView* array_view, struct ArrowArray* array, int64_t* n_fields, - int64_t* n_buffers) { + int64_t* n_buffers, int64_t* n_union_fields) { struct ArrowIpcField* field = fields + (*n_fields); field->array_view = array_view; field->array = array; @@ -1316,12 +1327,14 @@ static void ArrowIpcDecoderInitFields(struct ArrowIpcField* fields, for (int i = 0; i < NANOARROW_MAX_FIXED_BUFFERS; i++) { *n_buffers += array_view->layout.buffer_type[i] != NANOARROW_BUFFER_TYPE_NONE; } + *n_union_fields += array_view->storage_type == NANOARROW_TYPE_SPARSE_UNION || + array_view->storage_type == NANOARROW_TYPE_DENSE_UNION; *n_fields += 1; for (int64_t i = 0; i < array_view->n_children; i++) { ArrowIpcDecoderInitFields(fields, array_view->children[i], array->children[i], - n_fields, n_buffers); + n_fields, n_buffers, n_union_fields); } } @@ -1334,6 +1347,7 @@ ArrowErrorCode ArrowIpcDecoderSetSchema(struct ArrowIpcDecoder* decoder, // Reset previously allocated schema-specific resources private_data->n_buffers = 0; private_data->n_fields = 0; + private_data->n_union_fields = 0; ArrowArrayViewReset(&private_data->array_view); if (private_data->array.release != NULL) { ArrowArrayRelease(&private_data->array); @@ -1368,7 +1382,8 @@ ArrowErrorCode ArrowIpcDecoderSetSchema(struct ArrowIpcDecoder* decoder, // Init field information and calculate starting buffer offset for each int64_t field_i = 0; ArrowIpcDecoderInitFields(private_data->fields, &private_data->array_view, - &private_data->array, &field_i, &private_data->n_buffers); + &private_data->array, &field_i, &private_data->n_buffers, + &private_data->n_union_fields); return NANOARROW_OK; } @@ -1604,6 +1619,7 @@ struct ArrowIpcArraySetter { int64_t body_size_bytes; struct ArrowIpcBufferSource src; struct ArrowIpcBufferFactory factory; + enum ArrowIpcMetadataVersion version; }; static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter, int64_t offset, @@ -1691,6 +1707,21 @@ static int ArrowIpcDecoderWalkSetArrayView(struct ArrowIpcArraySetter* setter, array_view->null_count = ns(FieldNode_null_count(field)); setter->field_i += 1; + if (array_view->storage_type == NANOARROW_TYPE_SPARSE_UNION || + array_view->storage_type == NANOARROW_TYPE_DENSE_UNION) { + if (setter->version < NANOARROW_IPC_METADATA_VERSION_V5) { + ns(Buffer_struct_t) buffer = + ns(Buffer_vec_at(setter->buffers, (size_t)setter->buffer_i)); + if (ns(Buffer_length(buffer)) != 0) { + ArrowErrorSet(error, + "Cannot read pre-1.0.0 Union array with top-level validity bitmap"); + return EINVAL; + } + // skip the empty validity bitmap + setter->buffer_i += 1; + } + } + for (int i = 0; i < NANOARROW_MAX_FIXED_BUFFERS; i++) { if (array_view->layout.buffer_type[i] == NANOARROW_BUFFER_TYPE_NONE) { break; @@ -1803,6 +1834,7 @@ static ArrowErrorCode ArrowIpcDecoderDecodeArrayViewInternal( setter.factory = factory; setter.src.codec = decoder->codec; setter.src.swap_endian = ArrowIpcDecoderNeedsSwapEndian(decoder); + setter.version = decoder->metadata_version; // The flatbuffers FieldNode doesn't count the root struct so we have to loop over the // children ourselves