From 2d3dd67cd0a77e21c3cbbd2b5982fafcef7d48c2 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 5 Nov 2024 14:30:58 -0800 Subject: [PATCH] feat: add compaction support to balanced datasets (#3088) This allows compaction to succeed on the default storage. Running compaction on the sibling storage can be added in a future PR. In addition, this PR adds quite a few more test cases to make sure that a balanced dataset either performs as expected or gives a good "not yet supported" error message. In addition, this PR reworks the dataset-offset based take (e.g. `LanceDataset::take`) to reuse the id-based & address-based take paths (e.g. `TakeBuilder`) It also fixes a bug in the `TakeBuilder` path where duplicate IDs were not being handled. These latter changes are not strictly needed but are preparing for an eventual revamp of the take operation to address https://github.com/lancedb/lance/issues/2977 --- python/python/lance/dataset.py | 3 + python/python/tests/test_balanced.py | 205 ++++++++++++++++++--- rust/lance/src/dataset.rs | 11 +- rust/lance/src/dataset/fragment.rs | 9 + rust/lance/src/dataset/scanner.rs | 37 +++- rust/lance/src/dataset/take.rs | 245 ++++++++++++------------- rust/lance/src/dataset/write.rs | 4 +- rust/lance/src/dataset/write/update.rs | 13 ++ 8 files changed, 357 insertions(+), 170 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 2ab55250a1..337916b9f2 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3040,6 +3040,9 @@ def compact_files( max_rows_per_group: int, default 1024 Max number of rows per group. This does not affect which fragments need compaction, but does affect how they are re-written if selected. + + This setting only affects datasets using the legacy storage format. + The newer format does not require row groups. max_bytes_per_file: Optional[int], default None Max number of bytes in a single file. This does not affect which fragments need compaction, but does affect how they are re-written if diff --git a/python/python/tests/test_balanced.py b/python/python/tests/test_balanced.py index 59490cdfed..e2713c38c9 100644 --- a/python/python/tests/test_balanced.py +++ b/python/python/tests/test_balanced.py @@ -12,29 +12,33 @@ def big_val(): return b"0" * 1024 * 1024 +def make_table(offset, num_rows, big_val): + end = offset + num_rows + values = pa.array([big_val for _ in range(num_rows)], pa.large_binary()) + idx = pa.array(range(offset, end), pa.uint64()) + table = pa.record_batch( + [values, idx], + schema=pa.schema( + [ + pa.field( + "blobs", + pa.large_binary(), + metadata={ + "lance-schema:storage-class": "blob", + }, + ), + pa.field("idx", pa.uint64()), + ] + ), + ) + return table + + # 16 batches of 8 rows = 128 rows -def balanced_datagen(big_val): - for batch_idx in range(16): - start = batch_idx * 8 - end = start + 8 - values = pa.array([big_val for _ in range(start, end)], pa.large_binary()) - idx = pa.array(range(start, end), pa.uint64()) - table = pa.record_batch( - [values, idx], - schema=pa.schema( - [ - pa.field( - "blobs", - pa.large_binary(), - metadata={ - "lance-schema:storage-class": "blob", - }, - ), - pa.field("idx", pa.uint64()), - ] - ), - ) - yield table +def balanced_datagen(big_val, rows_per_batch, num_batches, offset=0): + for batch_idx in range(num_batches): + start = offset + (batch_idx * rows_per_batch) + yield make_table(start, rows_per_batch, big_val) @pytest.fixture @@ -42,9 +46,11 @@ def balanced_dataset(tmp_path, big_val): # 16 MiB per file, 128 total MiB, so we should have 8 blob files # # In addition, max_rows_per_file=64 means we should get 2 regular files - schema = next(iter(balanced_datagen(big_val))).schema + rows_per_batch = 8 + num_batches = 16 + schema = next(iter(balanced_datagen(big_val, 1, 1))).schema return lance.write_dataset( - balanced_datagen(big_val), + balanced_datagen(big_val, rows_per_batch, num_batches), tmp_path / "test_ds", max_bytes_per_file=16 * 1024 * 1024, max_rows_per_file=64, @@ -64,8 +70,10 @@ def test_append_then_take(balanced_dataset, tmp_path, big_val): ) # Now verify we can append some data + rows_per_batch = 8 + num_batches = 16 ds = lance.write_dataset( - balanced_datagen(big_val), + balanced_datagen(big_val, rows_per_batch, num_batches), tmp_path / "test_ds", max_bytes_per_file=32 * 1024 * 1024, schema=balanced_dataset.schema, @@ -118,6 +126,153 @@ def test_delete(balanced_dataset): assert len(balanced_dataset._take_rows(range(20, 80), columns=["blobs"])) == 20 +def test_scan(balanced_dataset): + # Scan without any special arguments should only return non-blob columns + expected = pa.table({"idx": pa.array(range(128), pa.uint64())}) + assert balanced_dataset.to_table() == expected + assert balanced_dataset.to_table(columns=["idx"]) == expected + # Can filter on regular columns + assert balanced_dataset.to_table(columns=["idx"], filter="idx < 1000") == expected + + # Scan with blob column specified should fail (currently, will support in future + # but need to make sure it fails currently so users don't shoot themselves in the + # foot) + with pytest.raises( + ValueError, match="Not supported.*Scanning.*non-default storage" + ): + balanced_dataset.to_table(columns=["idx", "blobs"]) + with pytest.raises( + ValueError, match="Not supported.*Scanning.*non-default storage" + ): + balanced_dataset.to_table(columns=["blobs"]) + + # Can't filter on blob columns either + with pytest.raises( + ValueError, + match="Not supported.*non-default storage columns cannot be used as filters", + ): + balanced_dataset.to_table(columns=["idx"], filter="blobs IS NOT NULL") + + +def test_compaction(tmp_path, big_val): + # Make a bunch of small 1-row writes + schema = next(iter(balanced_datagen(big_val, 1, 1))).schema + for write_idx in range(40): + lance.write_dataset( + balanced_datagen(big_val, 1, 1, offset=write_idx), + tmp_path / "test_ds", + max_bytes_per_file=16 * 1024 * 1024, + max_rows_per_file=64, + schema=schema, + mode="append", + ) + # Run compaction. Normal storage should compact to 1 file. Blob storage + # should compact to 3 files (40MB over 16MB per file) + ds = lance.dataset(tmp_path / "test_ds") + ds.optimize.compact_files(max_bytes_per_file=16 * 1024 * 1024) + + assert len(ds.get_fragments()) == 1 + + # TODO: Add support for compacting the blob files. For now, we just leave them + # uncompacted + assert len(list((tmp_path / "test_ds" / "_blobs" / "data").iterdir())) == 40 + + # Make sure we can still scan / take + + assert ds.to_table() == pa.table( + { + "idx": pa.array(range(40), pa.uint64()), + } + ) + row_ids = ds.to_table(columns=[], with_row_id=True).column("_rowid") + assert row_ids.to_pylist() == list(range(40)) + + assert ds._take_rows(row_ids.to_pylist(), columns=["idx"]) == pa.table( + { + "idx": pa.array(range(40), pa.uint64()), + } + ) + assert ds._take_rows(row_ids.to_pylist(), columns=["blobs"]) == pa.table( + { + "blobs": pa.array([big_val for _ in range(40)], pa.large_binary()), + } + ) + + +def test_schema(balanced_dataset): + # Schema should contain blob columns + assert balanced_dataset.schema == pa.schema( + [ + pa.field( + "blobs", + pa.large_binary(), + metadata={ + "lance-schema:storage-class": "blob", + }, + ), + pa.field("idx", pa.uint64()), + ] + ) + + +def test_sample(balanced_dataset): + assert balanced_dataset.sample(10, columns=["idx"]).num_rows == 10 + # Not the most obvious error but hopefully not long lived + with pytest.raises( + OSError, match="Not supported.*mapping from row addresses to row ids" + ): + assert balanced_dataset.sample(10).num_rows == 10 + with pytest.raises( + OSError, match="Not supported.*mapping from row addresses to row ids" + ): + assert balanced_dataset.sample(10, columns=["blobs"]).num_rows == 10 + + +def test_add_columns(tmp_path, balanced_dataset): + # Adding columns should be fine as long as we don't try to use the blob + # column in any way + + balanced_dataset.add_columns( + { + "idx2": "idx * 2", + } + ) + + assert balanced_dataset.to_table() == pa.table( + { + "idx": pa.array(range(128), pa.uint64()), + "idx2": pa.array(range(0, 256, 2), pa.uint64()), + } + ) + + with pytest.raises( + OSError, match="Not supported.*adding columns.*scanning non-default storage" + ): + balanced_dataset.add_columns({"blobs2": "blobs"}) + + +def test_unsupported(balanced_dataset, big_val): + # The following operations are not yet supported and we need to make + # sure they fail with a useful error message + + # Updates & merge-insert are not supported. They add new rows and we + # will need to make sure the sibling datasets are kept in sync. + + with pytest.raises( + ValueError, match="Not supported.*Updating.*non-default storage" + ): + balanced_dataset.update({"idx": "0"}) + + with pytest.raises( + # This error could be nicer but it's fine for now + OSError, + match="Not supported.*Scanning.*non-default storage", + ): + balanced_dataset.merge_insert("idx").when_not_matched_insert_all().execute( + make_table(0, 1, big_val) + ) + + # TODO: Once https://github.com/lancedb/lance/pull/3041 merges we will # want to test partial appends. We need to make sure an append of # non-blob data is supported. In order to do this we need to make diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 7668156888..d470f2f7ee 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1199,12 +1199,7 @@ impl Dataset { row_indices: &[u64], projection: impl Into, ) -> Result { - take::take( - self, - row_indices, - &projection.into().into_projection_plan(self.schema())?, - ) - .await + take::take(self, row_indices, projection.into()).await } /// Take Rows by the internal ROW ids. @@ -1605,10 +1600,6 @@ impl Dataset { .collect()) } - // Leaving this here so it is more obvious to future readers that we can do this and - // someone doesn't go off and create a new function to do this. Delete this comment - // if you use this method. - #[allow(unused)] pub(crate) async fn filter_deleted_addresses(&self, addrs: &[u64]) -> Result> { self.filter_addr_or_ids(addrs, addrs).await } diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index d80967a4ce..d2d6d9e604 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -1109,6 +1109,7 @@ impl FileFragment { projection: &Schema, with_row_address: bool, ) -> Result { + println!("Fragment take (offsets={:?}", row_offsets); let reader = self .open( projection, @@ -1186,6 +1187,14 @@ impl FileFragment { } schema = schema.project(&projection)?; } + + if schema.fields.iter().any(|f| !f.is_default_storage()) { + return Err(Error::NotSupported { + source: "adding columns whose value depends on scanning non-default storage".into(), + location: location!(), + }); + } + // If there is no projection, we at least need to read the row addresses with_row_addr |= schema.fields.is_empty(); diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index b55025bf95..23a441a12f 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -426,9 +426,16 @@ impl Scanner { &mut self, columns: &[(impl AsRef, impl AsRef)], ) -> Result<&mut Self> { - let physical_schema = self.scan_output_schema(true)?; + let base_schema = self.scan_output_schema(self.dataset.schema(), true)?; self.projection_plan = - ProjectionPlan::try_new(&physical_schema, columns, /*load_blobs=*/ false)?; + ProjectionPlan::try_new(&base_schema, columns, /*load_blobs=*/ false)?; + if self.projection_plan.sibling_schema.is_some() { + return Err(Error::NotSupported { + source: "Scanning columns with non-default storage class is not yet supported" + .into(), + location: location!(), + }); + } Ok(self) } @@ -859,15 +866,17 @@ impl Scanner { /// /// This includes columns that are added by the scan but don't exist in the dataset /// schema (e.g. _distance, _rowid, _rowaddr) - pub(crate) fn scan_output_schema(&self, force_row_id: bool) -> Result> { + pub(crate) fn scan_output_schema( + &self, + base_schema: &Schema, + force_row_id: bool, + ) -> Result> { let extra_columns = self.get_extra_columns(force_row_id); let schema = if !extra_columns.is_empty() { - self.projection_plan - .physical_schema - .merge(&ArrowSchema::new(extra_columns))? + base_schema.merge(&ArrowSchema::new(extra_columns))? } else { - self.projection_plan.physical_schema.as_ref().clone() + base_schema.clone() }; // drop metadata @@ -888,7 +897,10 @@ impl Scanner { // Append the extra columns let mut output_expr = self.projection_plan.to_physical_exprs()?; - let physical_schema = ArrowSchema::from(self.scan_output_schema(false)?.as_ref()); + let physical_schema = ArrowSchema::from( + self.scan_output_schema(&self.projection_plan.physical_schema, false)? + .as_ref(), + ); // distance goes before the row_id column if self.nearest.is_some() && output_expr.iter().all(|(_, name)| name != DIST_COL) { @@ -1043,6 +1055,12 @@ impl Scanner { // which do not exist in the dataset schema but are added by the scan. We can ignore // those as eager columns. let filter_schema = self.dataset.schema().project_or_drop(&columns)?; + if filter_schema.fields.iter().any(|f| !f.is_default_storage()) { + return Err(Error::NotSupported { + source: "non-default storage columns cannot be used as filters".into(), + location: location!(), + }); + } let physical_schema = self.projection_plan.physical_schema.clone(); let remaining_schema = physical_schema.exclude(&filter_schema)?; @@ -1367,7 +1385,8 @@ impl Scanner { } // Stage 5: take remaining columns required for projection - let physical_schema = self.scan_output_schema(false)?; + let physical_schema = + self.scan_output_schema(&self.projection_plan.physical_schema, false)?; let remaining_schema = physical_schema.exclude(plan.schema().as_ref())?; if !remaining_schema.fields.is_empty() { plan = self.take(plan, &remaining_schema, self.batch_readahead)?; diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index f3ff60dd3f..c390bbd45c 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -8,9 +8,8 @@ use crate::dataset::rowids::get_row_id_index; use crate::{Error, Result}; use arrow::{array::as_struct_array, compute::concat_batches, datatypes::UInt64Type}; use arrow_array::cast::AsArray; -use arrow_array::{Array, RecordBatch, StructArray, UInt64Array}; +use arrow_array::{RecordBatch, StructArray, UInt64Array}; use arrow_schema::{Field as ArrowField, Schema as ArrowSchema}; -use arrow_select::interleave::interleave; use datafusion::error::DataFusionError; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use futures::{Future, Stream, StreamExt, TryStreamExt}; @@ -26,142 +25,70 @@ use super::{fragment::FileFragment, scanner::DatasetRecordBatchStream, Dataset}; pub async fn take( dataset: &Dataset, - row_indices: &[u64], - projection: &ProjectionPlan, + offsets: &[u64], + projection: ProjectionRequest, ) -> Result { - if row_indices.is_empty() { + let projection = projection.into_projection_plan(dataset.schema())?; + + if offsets.is_empty() { return Ok(RecordBatch::new_empty(Arc::new( projection.output_schema()?, ))); } - let mut sorted_indices: Vec = (0..row_indices.len()).collect(); - sorted_indices.sort_by_key(|&i| row_indices[i]); - - let fragments = dataset.get_fragments().into_iter().map(Arc::new); - - // We will split into sub-requests for each fragment. - let mut sub_requests: Vec<(Arc, Range)> = Vec::new(); - // We will remap the row indices to the original row indices, using a pair - // of (request position, position in request) - let mut remap_index: Vec<(usize, usize)> = vec![(0, 0); row_indices.len()]; - let mut local_ids_buffer: Vec = Vec::with_capacity(row_indices.len()); - - let mut fragments_iter = fragments.into_iter(); - let mut current_fragment = fragments_iter.next().ok_or_else(|| Error::InvalidInput { - source: "Called take on an empty dataset.".to_string().into(), - location: location!(), - })?; - let mut current_fragment_len = current_fragment.count_rows().await?; - let mut curr_fragment_offset: u64 = 0; - let mut current_fragment_end = current_fragment_len as u64; - let mut start = 0; - let mut end = 0; - // We want to keep track of the previous row_index to detect duplicates - // index takes. To start, we pick a value that is guaranteed to be different - // from the first row_index. - let mut previous_row_index: u64 = row_indices[sorted_indices[0]] + 1; - let mut previous_sorted_index: usize = 0; - - for index in sorted_indices { - // Get the index - let row_index = row_indices[index]; - - if previous_row_index == row_index { - // If we have a duplicate index request we add a remap_index - // entry that points to the original index request. - remap_index[index] = remap_index[previous_sorted_index]; - continue; - } else { - previous_sorted_index = index; - previous_row_index = row_index; - } - - // If the row index is beyond the current fragment, iterate - // until we find the fragment that contains it. - while row_index >= current_fragment_end { - // If we have a non-empty sub-request, add it to the list - if end - start > 0 { - // If we have a non-empty sub-request, add it to the list - sub_requests.push((current_fragment, start..end)); - } + // First, convert the dataset offsets into row addresses + let fragments = dataset.get_fragments(); - start = end; + let mut perm = permutation::sort(offsets); + let sorted_offsets = perm.apply_slice(offsets); - current_fragment = fragments_iter.next().ok_or_else(|| Error::InvalidInput { - source: format!( - "Row index {} is beyond the range of the dataset.", - row_index - ) - .into(), - location: location!(), - })?; - curr_fragment_offset += current_fragment_len as u64; - current_fragment_len = current_fragment.count_rows().await?; - current_fragment_end = curr_fragment_offset + current_fragment_len as u64; + let mut frag_iter = fragments.iter(); + let mut cur_frag = frag_iter.next(); + let mut cur_frag_rows = if let Some(cur_frag) = cur_frag { + cur_frag.count_rows().await? as u64 + } else { + 0 + }; + let mut frag_offset = 0; + + let mut addrs = Vec::with_capacity(sorted_offsets.len()); + for sorted_offset in sorted_offsets.into_iter() { + while cur_frag.is_some() && sorted_offset >= frag_offset + cur_frag_rows { + frag_offset += cur_frag_rows; + cur_frag = frag_iter.next(); + cur_frag_rows = if let Some(cur_frag) = cur_frag { + cur_frag.count_rows().await? as u64 + } else { + 0 + }; } - - // Note that we cast to u32 *after* subtracting the offset, - // since it is possible for the global index to be larger than - // u32::MAX. - let local_index = (row_index - curr_fragment_offset) as u32; - local_ids_buffer.push(local_index); - - remap_index[index] = (sub_requests.len(), end - start); - - end += 1; + let Some(cur_frag) = cur_frag else { + addrs.push(RowAddress::TOMBSTONE_ROW); + continue; + }; + let row_addr = + RowAddress::new_from_parts(cur_frag.id() as u32, (sorted_offset - frag_offset) as u32); + addrs.push(u64::from(row_addr)); } - // flush last batch - if end - start > 0 { - sub_requests.push((current_fragment, start..end)); - } + // Restore the original order + perm.apply_inv_slice_in_place(&mut addrs); - let local_ids_buffer = Arc::new(local_ids_buffer); + let builder = TakeBuilder::try_new_from_addresses( + Arc::new(dataset.clone()), + addrs, + Arc::new(projection), + )?; - let take_tasks = sub_requests - .into_iter() - .map(|(fragment, indices_range)| { - let local_ids_buffer = local_ids_buffer.clone(); - let physical_schema = projection.physical_schema.clone(); - async move { - let local_ids = &local_ids_buffer[indices_range]; - fragment.take(local_ids, &physical_schema).await - } - }) - .collect::>(); - let take_stream = futures::stream::iter(take_tasks) - .buffered(dataset.object_store.io_parallelism()) - .map_err(|err| DataFusionError::External(err.into())) - .boxed(); - let take_stream = Box::pin(RecordBatchStreamAdapter::new( - projection.arrow_schema_ref(), - take_stream, - )); - let take_stream = projection.project_stream(take_stream)?; - let batches = take_stream.try_collect::>().await?; - - let struct_arrs: Vec = batches.into_iter().map(StructArray::from).collect(); - let refs: Vec<_> = struct_arrs.iter().map(|x| x as &dyn Array).collect(); - let reordered = interleave(&refs, &remap_index)?; - Ok(as_struct_array(&reordered).into()) + take_rows(builder).await } /// Take rows by the internal ROW ids. async fn do_take_rows( - builder: TakeBuilder, + mut builder: TakeBuilder, projection: Arc, ) -> Result { - let row_addrs = if let Some(row_id_index) = get_row_id_index(&builder.dataset).await? { - let addresses = builder - .row_ids - .iter() - .filter_map(|id| row_id_index.get(*id).map(|address| address.into())) - .collect::>(); - addresses - } else { - builder.row_ids - }; + let row_addrs = builder.get_row_addrs().await?.clone(); if row_addrs.is_empty() { // It is possible that `row_id_index` returns None when a fragment has been wholly deleted @@ -275,6 +202,8 @@ async fn do_take_rows( // Slow case: need to re-map data into expected order let mut sorted_row_addrs = row_addrs.clone(); sorted_row_addrs.sort(); + // Go ahead and dedup, we will reinsert duplicates during the remapping + sorted_row_addrs.dedup(); // Group ROW Ids by the fragment let mut row_addrs_per_fragment: BTreeMap> = BTreeMap::new(); sorted_row_addrs.iter().for_each(|row_addr| { @@ -330,7 +259,9 @@ async fn do_take_rows( }) .collect(); - debug_assert_eq!(remapping_index.len(), one_batch.num_rows()); + // remapping_index may be greater than the number of rows in one_batch + // if there are duplicates in the requested row ids. This is expected. + debug_assert!(remapping_index.len() >= one_batch.num_rows()); // Remove the rowaddr column. let keep_indices = (0..one_batch.num_columns() - 1).collect::>(); @@ -382,7 +313,7 @@ async fn zip_takes( } async fn take_rows(builder: TakeBuilder) -> Result { - if builder.row_ids.is_empty() { + if builder.is_empty() { return Ok(RecordBatch::new_empty(Arc::new( builder.projection.output_schema()?, ))); @@ -394,7 +325,7 @@ async fn take_rows(builder: TakeBuilder) -> Result { // If we have sibling columns then we load those in parallel to the local // columns and zip the results together. let sibling_take = if let Some(sibling_schema) = projection.sibling_schema.as_ref() { - let filtered_row_ids = builder.dataset.filter_deleted_ids(&builder.row_ids).await?; + let filtered_row_ids = builder.get_filtered_ids().await?; if filtered_row_ids.is_empty() { return Ok(RecordBatch::new_empty(Arc::new( builder.projection.output_schema()?, @@ -411,7 +342,8 @@ async fn take_rows(builder: TakeBuilder) -> Result { // The sibling take only takes valid row ids and sibling columns let mut builder = builder.clone(); builder.dataset = sibling_ds; - builder.row_ids = filtered_row_ids; + builder.row_ids = Some(filtered_row_ids); + builder.row_addrs = None; let blobs_projection = Arc::new(ProjectionPlan::inner_new( sibling_schema.clone(), false, @@ -509,7 +441,8 @@ fn check_row_addrs(row_ids: &[u64]) -> RowAddressStats { #[derive(Clone, Debug)] pub struct TakeBuilder { dataset: Arc, - row_ids: Vec, + row_ids: Option>, + row_addrs: Option>, projection: Arc, with_row_address: bool, } @@ -522,13 +455,29 @@ impl TakeBuilder { projection: ProjectionRequest, ) -> Result { Ok(Self { - row_ids, + row_ids: Some(row_ids), + row_addrs: None, projection: Arc::new(projection.into_projection_plan(dataset.schema())?), dataset, with_row_address: false, }) } + /// Create a new `TakeBuilder` for taking by address + pub fn try_new_from_addresses( + dataset: Arc, + addresses: Vec, + projection: Arc, + ) -> Result { + Ok(Self { + row_ids: None, + row_addrs: Some(addresses), + projection, + dataset, + with_row_address: false, + }) + } + /// Adds row addresses to the output pub fn with_row_address(mut self, with_row_address: bool) -> Self { self.with_row_address = with_row_address; @@ -539,6 +488,52 @@ impl TakeBuilder { pub async fn execute(self) -> Result { take_rows(self).await } + + pub fn is_empty(&self) -> bool { + match (self.row_ids.as_ref(), self.row_addrs.as_ref()) { + (Some(ids), _) => ids.is_empty(), + (_, Some(addrs)) => addrs.is_empty(), + _ => unreachable!(), + } + } + + async fn get_filtered_ids(&self) -> Result> { + match (self.row_ids.as_ref(), self.row_addrs.as_ref()) { + (Some(ids), _) => self.dataset.filter_deleted_ids(ids).await, + (_, Some(addrs)) => { + let _filtered_addresses = self.dataset.filter_deleted_addresses(addrs).await?; + // TODO: Create an inverse mapping from addresses to ids + // This path is currently encountered in the "take by dataset offsets" case. + // Another solution could be to translate dataset offsets into row ids instead + // of translating them into row addresses. + Err(Error::NotSupported { + source: "mapping from row addresses to row ids".into(), + location: location!(), + }) + } + _ => unreachable!(), + } + } + + async fn get_row_addrs(&mut self) -> Result<&Vec> { + if self.row_addrs.is_none() { + let row_ids = self + .row_ids + .as_ref() + .expect("row_ids must be set if row_addrs is not"); + let addrs = if let Some(row_id_index) = get_row_id_index(&self.dataset).await? { + let addresses = row_ids + .iter() + .filter_map(|id| row_id_index.get(*id).map(|address| address.into())) + .collect::>(); + addresses + } else { + row_ids.clone() + }; + self.row_addrs = Some(addrs); + } + Ok(self.row_addrs.as_ref().unwrap()) + } } #[cfg(test)] diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 4e9b42ef20..f28edee8a3 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -345,7 +345,9 @@ pub async fn write_fragments_internal( (schema, params.storage_version_or_default()) }; - let (data, blob_data) = data.extract_blob_stream(&schema); + let data_schema = schema.project_by_schema(data.schema().as_ref())?; + + let (data, blob_data) = data.extract_blob_stream(&data_schema); // Some params we borrow from the normal write, some we override let blob_write_params = WriteParams { diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index 43ae216792..a2a37cdfcc 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -160,6 +160,19 @@ impl UpdateBuilder { // pub fn with_write_params(mut self, params: WriteParams) -> Self { ... } pub fn build(self) -> Result { + if self + .dataset + .schema() + .fields + .iter() + .any(|f| !f.is_default_storage()) + { + return Err(Error::NotSupported { + source: "Updating datasets containing non-default storage columns".into(), + location: location!(), + }); + } + let mut updates = HashMap::new(); let planner = Planner::new(Arc::new(self.dataset.schema().into()));