From fb847073098daaf224a096d98f7ed5fb5a0c8cbf Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 13 Jun 2024 07:17:54 -0700 Subject: [PATCH] fix: incorrect logic when decoding manifest from last block (#2464) If the size of the manifest file is greater than 4KiB but the manifest itself is less than 4KiB then we can decode it from memory. However, we look at the wrong location in the block when decoding the manifest and panic. --- rust/lance/src/dataset.rs | 41 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 7b1cf500a2..7a8a627568 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -338,8 +338,11 @@ impl Dataset { // If manifest is in the last block, we can decode directly from memory. let manifest_size = object_reader.size().await?; let mut manifest = if manifest_size - offset <= last_block.len() { - let message_len = LittleEndian::read_u32(&last_block[offset..offset + 4]) as usize; - let message_data = &last_block[offset + 4..offset + 4 + message_len]; + let manifest_len = manifest_size - offset; + let offset_in_block = last_block.len() - manifest_len; + let message_len = + LittleEndian::read_u32(&last_block[offset_in_block..offset_in_block + 4]) as usize; + let message_data = &last_block[offset_in_block + 4..offset_in_block + 4 + message_len]; Manifest::try_from(lance_table::format::pb::Manifest::decode(message_data)?) } else { read_struct(object_reader.as_ref(), offset).await @@ -1329,7 +1332,7 @@ mod tests { DictionaryArray, Float32Array, Int32Array, Int64Array, Int8Array, Int8DictionaryArray, RecordBatchIterator, StringArray, UInt16Array, UInt32Array, }; - use arrow_array::{FixedSizeListArray, StructArray}; + use arrow_array::{FixedSizeListArray, Int16Array, Int16DictionaryArray, StructArray}; use arrow_ord::sort::sort_to_indices; use arrow_schema::{ DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema, @@ -3425,6 +3428,38 @@ mod tests { assert!(!dataset_dir.exists()); } + #[tokio::test] + async fn test_manifest_partially_fits() { + // This regresses a bug that occurred when the manifest file was over 4KiB but the manifest + // itself was less than 4KiB (due to a dictionary). 4KiB is important here because that's the + // block size we use when reading the "last block" + + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "x", + DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)), + false, + )])); + let dictionary = Arc::new(StringArray::from_iter_values( + (0..1000).map(|i| i.to_string()), + )); + let indices = Int16Array::from_iter_values(0..1000); + let batches = vec![RecordBatch::try_new( + schema.clone(), + vec![Arc::new( + Int16DictionaryArray::try_new(indices, dictionary.clone()).unwrap(), + )], + ) + .unwrap()]; + + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); + Dataset::write(batches, test_uri, None).await.unwrap(); + + let dataset = Dataset::open(test_uri).await.unwrap(); + assert_eq!(1000, dataset.count_rows(None).await.unwrap()); + } + #[tokio::test] async fn test_dataset_uri_roundtrips() { let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(