Skip to content

Commit

Permalink
fix: scan out of range
Browse files Browse the repository at this point in the history
  • Loading branch information
chenkovsky committed Jan 5, 2025
1 parent 45fde4c commit bdc9fc5
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 6 deletions.
12 changes: 12 additions & 0 deletions rust/lance-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,18 @@ impl ReadBatchParams {
)),
}
}

pub fn to_offsets_total(&self, total: u32) -> Result<PrimitiveArray<UInt32Type>> {
match self {
Self::Indices(indices) => Ok(indices.clone()),
Self::Range(r) => Ok(UInt32Array::from(Vec::from_iter(
r.start as u32..r.end as u32,
))),
Self::RangeFull => Ok(UInt32Array::from(Vec::from_iter(0 as u32..total))),
Self::RangeTo(r) => Ok(UInt32Array::from(Vec::from_iter(0..r.end as u32))),
Self::RangeFrom(r) => Ok(UInt32Array::from(Vec::from_iter(r.start as u32..total))),
}
}
}

#[cfg(test)]
Expand Down
72 changes: 66 additions & 6 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1981,12 +1981,7 @@ impl FragmentReader {
let merged = if self.with_row_addr as usize + self.with_row_id as usize
== self.output_schema.fields.len()
{
let selected_rows = params
.slice(0, total_num_rows as usize)
.unwrap()
.to_offsets()
.unwrap()
.len();
let selected_rows = params.to_offsets_total(total_num_rows).unwrap().len();
let tasks = (0..selected_rows)
.step_by(batch_size as usize)
.map(move |offset| {
Expand Down Expand Up @@ -2389,6 +2384,71 @@ mod tests {
}
}

#[tokio::test]
async fn test_rowid_rowaddr_only() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
// Creates 400 rows in 10 fragments
let mut dataset = create_dataset(test_uri, LanceFileVersion::Legacy).await;
// Delete last 20 rows in first fragment
dataset.delete("i >= 20").await.unwrap();
// Last fragment has 20 rows but 40 addressable rows
let fragment = &dataset.get_fragments()[0];
assert_eq!(fragment.metadata.num_rows().unwrap(), 20);

// Test with take_range (all rows addressable)
for (with_row_id, with_row_address) in [(false, true), (true, false), (true, true)] {
let reader = fragment
.open(
&fragment.schema().project::<&str>(&[]).unwrap(),
FragReadConfig::default()
.with_row_id(with_row_id)
.with_row_address(with_row_address),
None,
)
.await
.unwrap();
for valid_range in [0..40, 20..40] {
reader
.take_range(valid_range, 100)
.unwrap()
.buffered(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
}
for invalid_range in [0..41, 41..42] {
assert!(reader.take_range(invalid_range, 100).is_err());
}
}

// Test with read_range (only non-deleted rows addressable)
for (with_row_id, with_row_address) in [(false, true), (true, false), (true, true)] {
let reader = fragment
.open(
&fragment.schema().project::<&str>(&[]).unwrap(),
FragReadConfig::default()
.with_row_id(with_row_id)
.with_row_address(with_row_address),
None,
)
.await
.unwrap();
for valid_range in [0..20, 0..10, 10..20] {
reader
.read_range(valid_range, 100)
.unwrap()
.buffered(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
}
for invalid_range in [0..21, 21..22] {
assert!(reader.read_range(invalid_range, 100).is_err());
}
}
}

#[rstest]
#[tokio::test]
async fn test_fragment_take_range_deletions(
Expand Down

0 comments on commit bdc9fc5

Please sign in to comment.