diff --git a/src/nanoarrow/ipc/reader.c b/src/nanoarrow/ipc/reader.c index d8d61f768..392aec832 100644 --- a/src/nanoarrow/ipc/reader.c +++ b/src/nanoarrow/ipc/reader.c @@ -28,6 +28,10 @@ #define ENODATA 120 #endif +// Sentinel value to indicate that we haven't read a message yet +// and don't know the number of header prefix bytes to expect. +static const int32_t kExpectedHeaderPrefixSizeNotSet = -1; + void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src, struct ArrowIpcInputStream* dst) { memcpy(dst, src, sizeof(struct ArrowIpcInputStream)); @@ -186,6 +190,7 @@ struct ArrowIpcArrayStreamReaderPrivate { int64_t field_index; struct ArrowBuffer header; struct ArrowBuffer body; + int32_t expected_header_prefix_size; struct ArrowError error; }; @@ -230,7 +235,7 @@ static int ArrowIpcArrayStreamReaderNextHeader( // propagated higher (e.g., if the stream is empty and there's no schema message) ArrowErrorSet(&private_data->error, "No data available on stream"); return ENODATA; - } else if (bytes_read == 4) { + } else if (private_data->expected_header_prefix_size == 4) { // Special case very, very old IPC streams that used 0x00000000 as the // end-of-stream indicator. uint32_t last_four_bytes = 0; @@ -259,6 +264,14 @@ static int ArrowIpcArrayStreamReaderNextHeader( NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekHeader( &private_data->decoder, input_view, &prefix_size_bytes, &private_data->error)); + if (private_data->expected_header_prefix_size != kExpectedHeaderPrefixSizeNotSet && + prefix_size_bytes != private_data->expected_header_prefix_size) { + ArrowErrorSet(&private_data->error, + "Expected prefix %d prefix header bytes but found %d", + (int)private_data->expected_header_prefix_size, (int)prefix_size_bytes); + return EINVAL; + } + // Legacy streams are missing the 0xFFFFFFFF at the start of the message. The // decoder can handle this; however, verification will fail because flatbuffers // must be 8-byte aligned. To handle this case, we prepend the continuation @@ -300,6 +313,20 @@ static int ArrowIpcArrayStreamReaderNextHeader( NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderVerifyHeader(&private_data->decoder, input_view, &private_data->error)); + // Set the expected number of prefix bytes for reading the next message + // if we haven't encountered a message yet. + if (private_data->expected_header_prefix_size != kExpectedHeaderPrefixSizeNotSet) { + switch (private_data->decoder.metadata_version) { + // Earlier versions raise an in header verification + case NANOARROW_IPC_METADATA_VERSION_V4: + private_data->expected_header_prefix_size = 4; + break; + default: + private_data->expected_header_prefix_size = 8; + break; + } + } + // Don't decode the message if it's of the wrong type (because the error message // is better communicated by the caller) if (private_data->decoder.message_type != message_type) { @@ -485,6 +512,7 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit( ArrowBufferInit(&private_data->body); private_data->out_schema.release = NULL; ArrowIpcInputStreamMove(input_stream, &private_data->input); + private_data->expected_header_prefix_size = kExpectedHeaderPrefixSizeNotSet; if (options != NULL) { private_data->field_index = options->field_index;