Skip to content

Commit

Permalink
maybe better errors for legacy streams
Browse files Browse the repository at this point in the history
  • Loading branch information
paleolimbot committed Oct 4, 2024
1 parent b7df511 commit c195982
Showing 1 changed file with 29 additions and 1 deletion.
30 changes: 29 additions & 1 deletion src/nanoarrow/ipc/reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
#define ENODATA 120
#endif

// Sentinel value to indicate that we haven't read a message yet
// and don't know the number of header prefix bytes to expect.
static const int32_t kExpectedHeaderPrefixSizeNotSet = -1;

void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src,
struct ArrowIpcInputStream* dst) {
memcpy(dst, src, sizeof(struct ArrowIpcInputStream));
Expand Down Expand Up @@ -186,6 +190,7 @@ struct ArrowIpcArrayStreamReaderPrivate {
int64_t field_index;
struct ArrowBuffer header;
struct ArrowBuffer body;
int32_t expected_header_prefix_size;
struct ArrowError error;
};

Expand Down Expand Up @@ -230,7 +235,7 @@ static int ArrowIpcArrayStreamReaderNextHeader(
// propagated higher (e.g., if the stream is empty and there's no schema message)
ArrowErrorSet(&private_data->error, "No data available on stream");
return ENODATA;
} else if (bytes_read == 4) {
} else if (private_data->expected_header_prefix_size == 4) {
// Special case very, very old IPC streams that used 0x00000000 as the
// end-of-stream indicator.
uint32_t last_four_bytes = 0;
Expand Down Expand Up @@ -259,6 +264,14 @@ static int ArrowIpcArrayStreamReaderNextHeader(
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekHeader(
&private_data->decoder, input_view, &prefix_size_bytes, &private_data->error));

if (private_data->expected_header_prefix_size != kExpectedHeaderPrefixSizeNotSet &&
prefix_size_bytes != private_data->expected_header_prefix_size) {
ArrowErrorSet(&private_data->error,
"Expected prefix %d prefix header bytes but found %d",
(int)private_data->expected_header_prefix_size, (int)prefix_size_bytes);
return EINVAL;
}

// Legacy streams are missing the 0xFFFFFFFF at the start of the message. The
// decoder can handle this; however, verification will fail because flatbuffers
// must be 8-byte aligned. To handle this case, we prepend the continuation
Expand Down Expand Up @@ -300,6 +313,20 @@ static int ArrowIpcArrayStreamReaderNextHeader(
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderVerifyHeader(&private_data->decoder, input_view,
&private_data->error));

// Set the expected number of prefix bytes for reading the next message
// if we haven't encountered a message yet.
if (private_data->expected_header_prefix_size != kExpectedHeaderPrefixSizeNotSet) {
switch (private_data->decoder.metadata_version) {
// Earlier versions raise an in header verification
case NANOARROW_IPC_METADATA_VERSION_V4:
private_data->expected_header_prefix_size = 4;
break;
default:
private_data->expected_header_prefix_size = 8;
break;
}
}

// Don't decode the message if it's of the wrong type (because the error message
// is better communicated by the caller)
if (private_data->decoder.message_type != message_type) {
Expand Down Expand Up @@ -485,6 +512,7 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit(
ArrowBufferInit(&private_data->body);
private_data->out_schema.release = NULL;
ArrowIpcInputStreamMove(input_stream, &private_data->input);
private_data->expected_header_prefix_size = kExpectedHeaderPrefixSizeNotSet;

if (options != NULL) {
private_data->field_index = options->field_index;
Expand Down

0 comments on commit c195982

Please sign in to comment.