Skip to content

Commit

Permalink
fix: do not panic when performing a pushdown scan on a multi-data-fil…
Browse files Browse the repository at this point in the history
…e fragment (#1873)

The normal scan algorithm is:

```
open fragment reader with projected schema
for batch in batches:
  scan batch
```

The pushdown algorithm is:

```
open fragment with full schema
for batch, simplified_projection in filter(batches):
  projected scan batch(simplified_projection)
```

This means that the data files that need to be read could change from
batch to batch. This was not previously being accounted for and now it
is.

Closes #1871
  • Loading branch information
westonpace authored Jan 27, 2024
1 parent 6edcad7 commit e733bae
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 15 deletions.
21 changes: 21 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,27 @@ def test_merge_with_commit(tmp_path: Path):
assert tbl == expected


def test_merge_search(tmp_path: Path):
left_table = pa.Table.from_pydict({"id": [1, 2, 3], "left": ["a", "b", "c"]})
right_table = pa.Table.from_pydict({"id": [1, 2, 3], "right": ["A", "B", "C"]})

left_ds = lance.write_dataset(left_table, tmp_path / "left")

right_ds = lance.write_dataset(right_table, tmp_path / "right")
left_ds.merge(right_ds, "id")

full = left_ds.to_table()
full_filtered = left_ds.to_table(filter="id < 3")

partial = left_ds.to_table(columns=["left"])

assert full.column("left") == partial.column("left")

partial = left_ds.to_table(columns=["left"], filter="id < 3")

assert full_filtered.column("left") == partial.column("left")


def test_data_files(tmp_path: Path):
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
base_dir = tmp_path / "test"
Expand Down
54 changes: 39 additions & 15 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,9 @@ pub struct FragmentReader {

/// ID of the fragment
fragment_id: usize,

/// True if we are reading the row id
with_row_id: bool,
}

impl std::fmt::Display for FragmentReader {
Expand Down Expand Up @@ -772,10 +775,12 @@ impl FragmentReader {
readers,
deletion_vec,
fragment_id,
with_row_id: false,
})
}

pub(crate) fn with_row_id(&mut self) -> &mut Self {
self.with_row_id = true;
self.readers[0].0.with_row_id(true);
self
}
Expand Down Expand Up @@ -856,22 +861,41 @@ impl FragmentReader {
params: impl Into<ReadBatchParams> + Clone,
projection: &Schema,
) -> Result<RecordBatch> {
let read_tasks = self.readers.iter().map(|(reader, schema)| {
let projection = schema.intersection(projection);
let params = params.clone();

async move {
reader
.read_batch(
batch_id as i32,
params,
&projection?,
self.deletion_vec.as_ref().map(|dv| dv.as_ref()),
)
.await
}
});
let read_tasks = self
.readers
.iter()
.enumerate()
.map(|(reader_idx, (reader, schema))| {
let projection = schema.intersection(projection);
let params = params.clone();

async move {
// Apply ? inside the task to keep read_tasks a simple iter of futures
// for try_join_all
let projection = projection?;
// We always get the row_id from the first reader and so we need that even
// if the projection is empty
let need_for_row_id = self.with_row_id && reader_idx == 0;
if projection.fields.is_empty() && !need_for_row_id {
// The projection caused one of the data files to become
// irrelevant and so we can skip it
Result::Ok(None)
} else {
Ok(Some(
reader
.read_batch(
batch_id as i32,
params,
&projection,
self.deletion_vec.as_ref().map(|dv| dv.as_ref()),
)
.await?,
))
}
}
});
let batches = try_join_all(read_tasks).await?;
let batches = batches.into_iter().flatten().collect::<Vec<_>>();
let result = merge_batches(&batches)?;

Ok(result)
Expand Down

0 comments on commit e733bae

Please sign in to comment.