diff --git a/protos/transaction.proto b/protos/transaction.proto index 3aee36995e..9d7cc5faf2 100644 --- a/protos/transaction.proto +++ b/protos/transaction.proto @@ -28,6 +28,101 @@ message Transaction { // Optional version tag. string tag = 3; + // The list of operations that make up this transaction. + // + // The user operations are logical groupings of actions. These actions can be + // applied in sequence to the manifest to produce the final manifest. + // + // By grouping actions together, we can provide a human-readable history of + // the table. + // + // For example, consider the SQL query: + // + // ```sql + // BEGIN TRANSACTION; + // INSERT INTO t VALUES (1); + // DELETE FROM t WHERE id = 32; + // UPDATE t SET a = 2 WHERE id = 1; + // COMMIT; + // ``` + // + // Would be represented as: + // + // ```yaml + // - description: "INSERT INTO t VALUES (1);" + // uuid: 123e4567-e89b-12d3-a456-426655440001 + // read_version: 0 + // actions: + // - type: add_fragments + // fragments: + // - id: 10 + // files: + // - path: data/123e4567-e89b-12d3-a456-426655440000.lance + // fields: [0] + // - description: "DELETE FROM t WHERE id = 32;" + // uuid: 123e4567-e89b-12d3-a456-426655440002 + // read_version: 0 + // actions: + // - type: updated_fragments + // fragments: + // - id: 0 + // files: + // - path: data/dfdsfdsd-e89b-12d3-a456-426655440000.lance + // fields: [0] + // deletion_file: + // - file_type: ARROW_ARRAY + // read_version: 0 + // id: 10 + // num_deleted_rows: 1 + // - description: "UPDATE t SET a = 2 WHERE id = 1;" + // uuid: 123e4567-e89b-12d3-a456-426655440003 + // read_version: 0 + // actions: + // - type: updated_fragments + // fragments: + // - id: 0 + // files: + // - path: data/123e4567-sdfs-12d3-sdfs-426655440000.lance + // fields: [0] + // deletion_file: + // - file_type: ARROW_ARRAY + // read_version: 0 + // id: 10 + // num_deleted_rows: 1 + // ``` + message CompositeOperation { + repeated UserOperation user_operations = 1; + } + + // A logical grouping of actions that correspond to a single user action. + message UserOperation { + uint64 read_version = 1; + string uuid = 2; + string description = 4; + repeated Action actions = 5; + } + + // An action to apply to the manifest. + message Action { + message AddFragments { + repeated DataFragment fragments = 1; + } + + message DeleteFragments { + repeated uint64 deleted_fragment_ids = 1; + } + + message UpdateFragments { + repeated DataFragment updated_fragments = 1; + } + + oneof action { + AddFragments add_fragments = 1; + DeleteFragments delete_fragments = 2; + UpdateFragments update_fragments = 3; + } + } + // Add new rows to the dataset. message Append { // The new fragments to append. @@ -178,6 +273,7 @@ message Transaction { Update update = 108; Project project = 109; UpdateConfig update_config = 110; + CompositeOperation composite_operation = 111; } // An operation to apply to the blob dataset diff --git a/rust/lance-table/src/format.rs b/rust/lance-table/src/format.rs index 5636f9138b..195ac49dce 100644 --- a/rust/lance-table/src/format.rs +++ b/rust/lance-table/src/format.rs @@ -8,6 +8,7 @@ use uuid::Uuid; mod fragment; mod index; mod manifest; +pub mod transaction; pub use fragment::*; pub use index::Index; diff --git a/rust/lance-table/src/format/transaction.rs b/rust/lance-table/src/format/transaction.rs new file mode 100644 index 0000000000..7ecce04fd6 --- /dev/null +++ b/rust/lance-table/src/format/transaction.rs @@ -0,0 +1,826 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Transaction definitions for updating datasets +//! +//! Prior to creating a new manifest, a transaction must be created representing +//! the changes being made to the dataset. By representing them as incremental +//! changes, we can detect whether concurrent operations are compatible with +//! one another. We can also rebuild manifests when retrying committing a +//! manifest. +//! +//! ## Conflict Resolution +//! +//! Transactions are compatible with one another if they don't conflict. +//! Currently, conflict resolution always assumes a Serializable isolation +//! level. +//! +//! Below are the compatibilities between conflicting transactions. The columns +//! represent the operation that has been applied, while the rows represent the +//! operation that is being checked for compatibility to see if it can retry. +//! ✅ indicates that the operation is compatible, while ❌ indicates that it is +//! a conflict. Some operations have additional conditions that must be met for +//! them to be compatible. +//! +//! | | Append | Delete / Update | Overwrite/Create | Create Index | Rewrite | Merge | Project | UpdateConfig | +//! |------------------|--------|-----------------|------------------|--------------|---------|-------|---------|-------------| +//! | Append | ✅ | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ | ✅ | +//! | Delete / Update | ✅ | (1) | ❌ | ✅ | (1) | ❌ | ❌ | ✅ | +//! | Overwrite/Create | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | (2) | +//! | Create index | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ | +//! | Rewrite | ✅ | (1) | ❌ | ❌ | (1) | ❌ | ❌ | ✅ | +//! | Merge | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ | ❌ | ✅ | +//! | Project | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ | +//! | UpdateConfig | ✅ | ✅ | (2) | ✅ | ✅ | ✅ | ✅ | (2) | +//! +//! (1) Delete, update, and rewrite are compatible with each other and themselves only if +//! they affect distinct fragments. Otherwise, they conflict. +//! (2) Operations that mutate the config conflict if one of the operations upserts a key +//! that if referenced by another concurrent operation. + +use std::{collections::HashSet, sync::Arc, time::SystemTime}; + +use crate::{ + format::{pb, DataStorageFormat, Fragment, Index, Manifest, RowIdMeta}, + io::{ + commit::CommitHandler, + manifest::{read_manifest, read_manifest_indexes}, + }, + rowids::{write_row_ids, RowIdSequence}, +}; +use deepsize::DeepSizeOf; +use lance_core::{datatypes::Schema, Error, Result}; +use lance_file::version::LanceFileVersion; +use lance_io::object_store::ObjectStore; +use object_store::path::Path; +use roaring::RoaringBitmap; +use snafu::{location, Location}; +use v2::UserOperation; + +use crate::feature_flags::{apply_feature_flags, FLAG_MOVE_STABLE_ROW_IDS}; +use crate::utils::timestamp_to_nanos; + +pub mod v1; +pub mod v2; + +pub use v1::{validate_operation, Operation, RewriteGroup, RewrittenIndex}; +pub use v2::Action; + +#[derive(Debug)] +pub struct ManifestWriteConfig { + pub auto_set_feature_flags: bool, // default true + pub timestamp: Option, // default None + pub use_move_stable_row_ids: bool, // default false + pub use_legacy_format: Option, // default None + pub storage_format: Option, // default None +} + +impl Default for ManifestWriteConfig { + fn default() -> Self { + Self { + auto_set_feature_flags: true, + timestamp: None, + use_move_stable_row_ids: false, + use_legacy_format: None, + storage_format: None, + } + } +} + +/// A change to a dataset that can be retried +/// +/// This contains enough information to be able to build the next manifest, +/// given the current manifest. +#[derive(Debug, Clone, DeepSizeOf)] +pub enum Transaction { + /// A transaction based on a sequence of [UserOperation]s. Each + /// [UserOperation] contains a sequence of [v2::action::Action]s. + V2(v2::Transaction), + /// A transaction based on of an [Operation]. + V1(v1::Transaction), +} + +impl Transaction { + pub fn new_v1(read_version: u64, operation: Operation, blobs_op: Option) -> Self { + let uuid = uuid::Uuid::new_v4().hyphenated().to_string(); + Self::V1(v1::Transaction { + read_version, + uuid, + operation, + blobs_op, + tag: None, + }) + } + + pub fn new_v2( + read_version: u64, + operations: Vec, + blob_ops: Vec, + ) -> Self { + let uuid = uuid::Uuid::new_v4().hyphenated().to_string(); + Self::V2(v2::Transaction { + read_version, + uuid, + operations, + blob_ops, + }) + } + + pub fn read_version(&self) -> u64 { + match self { + Self::V1(v1) => v1.read_version, + Self::V2(v2) => v2.read_version, + } + } + + pub fn uuid(&self) -> &str { + match self { + Self::V1(v1) => &v1.uuid, + Self::V2(v2) => &v2.uuid, + } + } + + pub fn operation(&self) -> Option<&Operation> { + match self { + Self::V1(v1) => Some(&v1.operation), + Self::V2(_) => None, + } + } + + pub fn blob_transaction(&self, read_version: u64) -> Option { + match self { + Self::V1(v1) => v1.blobs_op.clone().map(|operation| { + Self::V1(v1::Transaction { + read_version, + uuid: v1.uuid.clone(), + operation, + blobs_op: None, + tag: None, + }) + }), + Self::V2(v2) => { + if !v2.blob_ops.is_empty() { + Some(Self::V2(v2::Transaction { + read_version, + uuid: v2.uuid.clone(), + operations: v2.blob_ops.clone(), + blob_ops: Vec::new(), + })) + } else { + None + } + } + } + } + + /// Returns true if the transaction cannot be committed if the other + /// transaction is committed first. + pub fn conflicts_with(&self, other: &Self) -> bool { + match (self, other) { + (Self::V1(a), Self::V1(b)) => a.operation.conflicts_with(&b.operation), + _ => todo!(), + } + } + + fn fragments_with_ids<'a, T>( + new_fragments: T, + fragment_id: &'a mut u64, + ) -> impl Iterator + 'a + where + T: IntoIterator + 'a, + { + new_fragments.into_iter().map(move |mut f| { + if f.id == 0 { + f.id = *fragment_id; + *fragment_id += 1; + } + f + }) + } + + fn data_storage_format_from_files( + fragments: &[Fragment], + user_requested: Option, + ) -> Result { + if let Some(file_version) = Fragment::try_infer_version(fragments)? { + // Ensure user-requested matches data files + if let Some(user_requested) = user_requested { + if user_requested != file_version { + return Err(Error::invalid_input( + format!("User requested data storage version ({}) does not match version in data files ({})", user_requested, file_version), + location!(), + )); + } + } + Ok(DataStorageFormat::new(file_version)) + } else { + // If no files use user-requested or default + Ok(user_requested + .map(DataStorageFormat::new) + .unwrap_or_default()) + } + } + + /// Restore an old manifest from the given version. + pub async fn restore_old_manifest( + object_store: &ObjectStore, + commit_handler: &dyn CommitHandler, + base_path: &Path, + version: u64, + config: &ManifestWriteConfig, + tx_path: &str, + ) -> Result<(Manifest, Vec)> { + let location = commit_handler + .resolve_version_location(base_path, version, &object_store.inner) + .await?; + let mut manifest = read_manifest(object_store, &location.path, location.size).await?; + manifest.set_timestamp(timestamp_to_nanos(config.timestamp)); + manifest.transaction_file = Some(tx_path.to_string()); + let indices = read_manifest_indexes(object_store, &location.path, &manifest).await?; + Ok((manifest, indices)) + } + + /// Create a new manifest from the current manifest and the transaction. + /// + /// `current_manifest` should only be None if the dataset does not yet exist. + pub fn build_manifest( + &self, + current_manifest: Option<&Manifest>, + current_indices: Vec, + transaction_file_path: &str, + config: &ManifestWriteConfig, + new_blob_version: Option, + ) -> Result<(Manifest, Vec)> { + let Self::V1(transaction) = self else { + return Err(Error::NotSupported { + source: "Cannot build a manifest from a V2 transaction".into(), + location: location!(), + }); + }; + + if config.use_move_stable_row_ids + && current_manifest + .map(|m| !m.uses_move_stable_row_ids()) + .unwrap_or_default() + { + return Err(Error::NotSupported { + source: "Cannot enable stable row ids on existing dataset".into(), + location: location!(), + }); + } + + // Get the schema and the final fragment list + let schema = match transaction.operation { + Operation::Overwrite { ref schema, .. } => schema.clone(), + Operation::Merge { ref schema, .. } => schema.clone(), + Operation::Project { ref schema, .. } => schema.clone(), + _ => { + if let Some(current_manifest) = current_manifest { + current_manifest.schema.clone() + } else { + return Err(Error::Internal { + message: "Cannot create a new dataset without a schema".to_string(), + location: location!(), + }); + } + } + }; + + let mut fragment_id = if matches!(transaction.operation, Operation::Overwrite { .. }) { + 0 + } else { + current_manifest + .and_then(|m| m.max_fragment_id()) + .map(|id| id + 1) + .unwrap_or(0) + }; + let mut final_fragments = Vec::new(); + let mut final_indices = current_indices; + + let mut next_row_id = { + // Only use row ids if the feature flag is set already or + match (current_manifest, config.use_move_stable_row_ids) { + (Some(manifest), _) + if manifest.reader_feature_flags & FLAG_MOVE_STABLE_ROW_IDS != 0 => + { + Some(manifest.next_row_id) + } + (None, true) => Some(0), + (_, false) => None, + (Some(_), true) => { + return Err(Error::NotSupported { + source: "Cannot enable stable row ids on existing dataset".into(), + location: location!(), + }); + } + } + }; + + let maybe_existing_fragments = + current_manifest + .map(|m| m.fragments.as_ref()) + .ok_or_else(|| Error::Internal { + message: format!( + "No current manifest was provided while building manifest for operation {}", + transaction.operation.name() + ), + location: location!(), + }); + + match &transaction.operation { + Operation::Append { ref fragments } => { + final_fragments.extend(maybe_existing_fragments?.clone()); + let mut new_fragments = + Self::fragments_with_ids(fragments.clone(), &mut fragment_id) + .collect::>(); + if let Some(next_row_id) = &mut next_row_id { + Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?; + } + final_fragments.extend(new_fragments); + } + Operation::Delete { + ref updated_fragments, + ref deleted_fragment_ids, + .. + } => { + // Remove the deleted fragments + final_fragments.extend(maybe_existing_fragments?.clone()); + final_fragments.retain(|f| !deleted_fragment_ids.contains(&f.id)); + final_fragments.iter_mut().for_each(|f| { + for updated in updated_fragments { + if updated.id == f.id { + *f = updated.clone(); + } + } + }); + Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments) + } + Operation::Update { + removed_fragment_ids, + updated_fragments, + new_fragments, + } => { + final_fragments.extend(maybe_existing_fragments?.iter().filter_map(|f| { + if removed_fragment_ids.contains(&f.id) { + return None; + } + if let Some(updated) = updated_fragments.iter().find(|uf| uf.id == f.id) { + Some(updated.clone()) + } else { + Some(f.clone()) + } + })); + let mut new_fragments = + Self::fragments_with_ids(new_fragments.clone(), &mut fragment_id) + .collect::>(); + if let Some(next_row_id) = &mut next_row_id { + Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?; + } + final_fragments.extend(new_fragments); + Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments) + } + Operation::Overwrite { ref fragments, .. } => { + let mut new_fragments = + Self::fragments_with_ids(fragments.clone(), &mut fragment_id) + .collect::>(); + if let Some(next_row_id) = &mut next_row_id { + Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?; + } + final_fragments.extend(new_fragments); + final_indices = Vec::new(); + } + Operation::Rewrite { + ref groups, + ref rewritten_indices, + } => { + final_fragments.extend(maybe_existing_fragments?.clone()); + let current_version = current_manifest.map(|m| m.version).unwrap_or_default(); + Self::handle_rewrite_fragments( + &mut final_fragments, + groups, + &mut fragment_id, + current_version, + )?; + + if next_row_id.is_some() { + // We can re-use indices, but need to rewrite the fragment bitmaps + debug_assert!(rewritten_indices.is_empty()); + for index in final_indices.iter_mut() { + if let Some(fragment_bitmap) = &mut index.fragment_bitmap { + *fragment_bitmap = + Self::recalculate_fragment_bitmap(fragment_bitmap, groups)?; + } + } + } else { + Self::handle_rewrite_indices(&mut final_indices, rewritten_indices, groups)?; + } + } + Operation::CreateIndex { + new_indices, + removed_indices, + } => { + final_fragments.extend(maybe_existing_fragments?.clone()); + final_indices.retain(|existing_index| { + !new_indices + .iter() + .any(|new_index| new_index.name == existing_index.name) + && !removed_indices + .iter() + .any(|old_index| old_index.uuid == existing_index.uuid) + }); + final_indices.extend(new_indices.clone()); + } + Operation::ReserveFragments { .. } => { + final_fragments.extend(maybe_existing_fragments?.clone()); + } + Operation::Merge { ref fragments, .. } => { + final_fragments.extend(fragments.clone()); + + // Some fields that have indices may have been removed, so we should + // remove those indices as well. + Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments) + } + Operation::Project { .. } => { + final_fragments.extend(maybe_existing_fragments?.clone()); + + // We might have removed all fields for certain data files, so + // we should remove the data files that are no longer relevant. + let remaining_field_ids = schema + .fields_pre_order() + .map(|f| f.id) + .collect::>(); + for fragment in final_fragments.iter_mut() { + fragment.files.retain(|file| { + file.fields + .iter() + .any(|field_id| remaining_field_ids.contains(field_id)) + }); + } + + // Some fields that have indices may have been removed, so we should + // remove those indices as well. + Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments) + } + Operation::Restore { .. } => { + unreachable!() + } + Operation::UpdateConfig { .. } => {} + }; + + // If a fragment was reserved then it may not belong at the end of the fragments list. + final_fragments.sort_by_key(|frag| frag.id); + + let user_requested_version = match (&config.storage_format, config.use_legacy_format) { + (Some(storage_format), _) => Some(storage_format.lance_file_version()?), + (None, Some(true)) => Some(LanceFileVersion::Legacy), + (None, Some(false)) => Some(LanceFileVersion::V2_0), + (None, None) => None, + }; + + let mut manifest = if let Some(current_manifest) = current_manifest { + let mut prev_manifest = Manifest::new_from_previous( + current_manifest, + schema, + Arc::new(final_fragments), + new_blob_version, + ); + if user_requested_version.is_some() + && matches!(transaction.operation, Operation::Overwrite { .. }) + { + // If this is an overwrite operation and the user has requested a specific version + // then overwrite with that version. Otherwise, if the user didn't request a specific + // version, then overwrite with whatever version we had before. + prev_manifest.data_storage_format = + DataStorageFormat::new(user_requested_version.unwrap()); + } + prev_manifest + } else { + let data_storage_format = + Self::data_storage_format_from_files(&final_fragments, user_requested_version)?; + Manifest::new( + schema, + Arc::new(final_fragments), + data_storage_format, + new_blob_version, + ) + }; + + manifest.tag.clone_from(&transaction.tag); + + if config.auto_set_feature_flags { + apply_feature_flags(&mut manifest, config.use_move_stable_row_ids)?; + } + manifest.set_timestamp(timestamp_to_nanos(config.timestamp)); + + manifest.update_max_fragment_id(); + + match &transaction.operation { + Operation::Overwrite { + config_upsert_values: Some(tm), + .. + } => manifest.update_config(tm.clone()), + Operation::UpdateConfig { + upsert_values, + delete_keys, + } => { + // Delete is handled first. If the same key is referenced by upsert and + // delete, then upserted key-value pair will remain. + if let Some(delete_keys) = delete_keys { + manifest.delete_config_keys( + delete_keys + .iter() + .map(|s| s.as_str()) + .collect::>() + .as_slice(), + ) + } + if let Some(upsert_values) = upsert_values { + manifest.update_config(upsert_values.clone()); + } + } + _ => {} + } + + if let Operation::ReserveFragments { num_fragments } = transaction.operation { + manifest.max_fragment_id += num_fragments; + } + + manifest.transaction_file = Some(transaction_file_path.to_string()); + + if let Some(next_row_id) = next_row_id { + manifest.next_row_id = next_row_id; + } + + Ok((manifest, final_indices)) + } + + fn retain_relevant_indices(indices: &mut Vec, schema: &Schema, fragments: &[Fragment]) { + let field_ids = schema + .fields_pre_order() + .map(|f| f.id) + .collect::>(); + indices.retain(|existing_index| { + existing_index + .fields + .iter() + .all(|field_id| field_ids.contains(field_id)) + }); + + // We might have also removed all fragments that an index was covering, so + // we should remove those indices as well. + let fragment_ids = fragments.iter().map(|f| f.id).collect::>(); + indices.retain(|existing_index| { + existing_index + .fragment_bitmap + .as_ref() + .map(|bitmap| bitmap.iter().any(|id| fragment_ids.contains(&(id as u64)))) + .unwrap_or(true) + }); + } + + fn recalculate_fragment_bitmap( + old: &RoaringBitmap, + groups: &[RewriteGroup], + ) -> Result { + let mut new_bitmap = old.clone(); + for group in groups { + let any_in_index = group + .old_fragments + .iter() + .any(|frag| old.contains(frag.id as u32)); + let all_in_index = group + .old_fragments + .iter() + .all(|frag| old.contains(frag.id as u32)); + // Any rewrite group may or may not be covered by the index. However, if any fragment + // in a rewrite group was previously covered by the index then all fragments in the rewrite + // group must have been previously covered by the index. plan_compaction takes care of + // this for us so this should be safe to assume. + if any_in_index { + if all_in_index { + for frag_id in group.old_fragments.iter().map(|frag| frag.id as u32) { + new_bitmap.remove(frag_id); + } + new_bitmap.extend(group.new_fragments.iter().map(|frag| frag.id as u32)); + } else { + return Err(Error::invalid_input("The compaction plan included a rewrite group that was a split of indexed and non-indexed data", location!())); + } + } + } + Ok(new_bitmap) + } + + fn handle_rewrite_indices( + indices: &mut [Index], + rewritten_indices: &[RewrittenIndex], + groups: &[RewriteGroup], + ) -> Result<()> { + let mut modified_indices = HashSet::new(); + + for rewritten_index in rewritten_indices { + if !modified_indices.insert(rewritten_index.old_id) { + return Err(Error::invalid_input(format!("An invalid compaction plan must have been generated because multiple tasks modified the same index: {}", rewritten_index.old_id), location!())); + } + + let index = indices + .iter_mut() + .find(|idx| idx.uuid == rewritten_index.old_id) + .ok_or_else(|| { + Error::invalid_input( + format!( + "Invalid compaction plan refers to index {} which does not exist", + rewritten_index.old_id + ), + location!(), + ) + })?; + + index.fragment_bitmap = Some(Self::recalculate_fragment_bitmap( + index.fragment_bitmap.as_ref().ok_or_else(|| { + Error::invalid_input( + format!( + "Cannot rewrite index {} which did not store fragment bitmap", + index.uuid + ), + location!(), + ) + })?, + groups, + )?); + index.uuid = rewritten_index.new_id; + } + Ok(()) + } + + fn handle_rewrite_fragments( + final_fragments: &mut Vec, + groups: &[RewriteGroup], + fragment_id: &mut u64, + version: u64, + ) -> Result<()> { + for group in groups { + // If the old fragments are contiguous, find the range + let replace_range = { + let start = final_fragments.iter().enumerate().find(|(_, f)| f.id == group.old_fragments[0].id) + .ok_or_else(|| Error::CommitConflict { version, source: + format!("dataset does not contain a fragment a rewrite operation wants to replace: id={}", group.old_fragments[0].id).into() , location:location!()})?.0; + + // Verify old_fragments matches contiguous range + let mut i = 1; + loop { + if i == group.old_fragments.len() { + break Some(start..start + i); + } + if final_fragments[start + i].id != group.old_fragments[i].id { + break None; + } + i += 1; + } + }; + + let new_fragments = Self::fragments_with_ids(group.new_fragments.clone(), fragment_id); + if let Some(replace_range) = replace_range { + // Efficiently path using slice + final_fragments.splice(replace_range, new_fragments); + } else { + // Slower path for non-contiguous ranges + for fragment in group.old_fragments.iter() { + final_fragments.retain(|f| f.id != fragment.id); + } + final_fragments.extend(new_fragments); + } + } + Ok(()) + } + + fn assign_row_ids(next_row_id: &mut u64, fragments: &mut [Fragment]) -> Result<()> { + for fragment in fragments { + let physical_rows = fragment.physical_rows.ok_or_else(|| Error::Internal { + message: "Fragment does not have physical rows".into(), + location: location!(), + })? as u64; + let row_ids = *next_row_id..(*next_row_id + physical_rows); + let sequence = RowIdSequence::from(row_ids); + // TODO: write to a separate file if large. Possibly share a file with other fragments. + let serialized = write_row_ids(&sequence); + fragment.row_id_meta = Some(RowIdMeta::Inline(serialized)); + *next_row_id += physical_rows; + } + Ok(()) + } +} + +impl TryFrom for Transaction { + type Error = Error; + + fn try_from(message: pb::Transaction) -> Result { + match message.operation { + Some(pb::transaction::Operation::CompositeOperation( + pb::transaction::CompositeOperation { user_operations }, + )) => { + let operations = user_operations + .into_iter() + .map(UserOperation::try_from) + .collect::>()?; + if message.blob_operation.is_some() { + return Err(Error::NotSupported { + source: "Blob ops are not yet supported for composite operations".into(), + location: location!(), + }); + } + Ok(Self::V2(v2::Transaction { + read_version: message.read_version, + uuid: message.uuid.clone(), + operations, + blob_ops: Vec::new(), + })) + } + Some(op) => { + let operation = Operation::try_from(op)?; + let blobs_op = message + .blob_operation + .map(|blob_op| blob_op.try_into()) + .transpose()?; + Ok(Self::V1(v1::Transaction { + read_version: message.read_version, + uuid: message.uuid.clone(), + operation, + blobs_op, + tag: if message.tag.is_empty() { + None + } else { + Some(message.tag.clone()) + }, + })) + } + None => Err(Error::Internal { + message: "Transaction message did not contain an operation".to_string(), + location: location!(), + }), + } + } +} + +impl From<&Transaction> for pb::Transaction { + fn from(value: &Transaction) -> Self { + match value { + Transaction::V1(v1) => v1.into(), + Transaction::V2(v2) => v2.into(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_rewrite_fragments() { + let existing_fragments: Vec = (0..10).map(Fragment::new).collect(); + + let mut final_fragments = existing_fragments; + let rewrite_groups = vec![ + // Since these are contiguous, they will be put in the same location + // as 1 and 2. + RewriteGroup { + old_fragments: vec![Fragment::new(1), Fragment::new(2)], + // These two fragments were previously reserved + new_fragments: vec![Fragment::new(15), Fragment::new(16)], + }, + // These are not contiguous, so they will be inserted at the end. + RewriteGroup { + old_fragments: vec![Fragment::new(5), Fragment::new(8)], + // We pretend this id was not reserved. Does not happen in practice today + // but we want to leave the door open. + new_fragments: vec![Fragment::new(0)], + }, + ]; + + let mut fragment_id = 20; + let version = 0; + + Transaction::handle_rewrite_fragments( + &mut final_fragments, + &rewrite_groups, + &mut fragment_id, + version, + ) + .unwrap(); + + assert_eq!(fragment_id, 21); + + let expected_fragments: Vec = vec![ + Fragment::new(0), + Fragment::new(15), + Fragment::new(16), + Fragment::new(3), + Fragment::new(4), + Fragment::new(6), + Fragment::new(7), + Fragment::new(9), + Fragment::new(20), + ]; + + assert_eq!(final_fragments, expected_fragments); + } +} diff --git a/rust/lance-table/src/format/transaction/v1/mod.rs b/rust/lance-table/src/format/transaction/v1/mod.rs new file mode 100644 index 0000000000..3dfcc81636 --- /dev/null +++ b/rust/lance-table/src/format/transaction/v1/mod.rs @@ -0,0 +1,44 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +mod operation; + +use deepsize::DeepSizeOf; +pub use operation::{validate_operation, Operation, RewriteGroup, RewrittenIndex}; + +use crate::format::pb; + +/// A change to a dataset that can be retried +/// +/// This contains enough information to be able to build the next manifest, +/// given the current manifest. +#[derive(Debug, Clone, DeepSizeOf)] +pub struct Transaction { + /// The version of the table this transaction is based off of. If this is + /// the first transaction, this should be 0. + pub read_version: u64, + pub uuid: String, + pub operation: Operation, + /// If the transaction modified the blobs dataset, this is the operation + /// to apply to the blobs dataset. + /// + /// If this is `None`, then the blobs dataset was not modified + pub blobs_op: Option, + pub tag: Option, +} + +impl From<&Transaction> for pb::Transaction { + fn from(value: &Transaction) -> Self { + let operation = (&value.operation).into(); + + let blob_operation = value.blobs_op.as_ref().map(|op| op.into()); + + Self { + read_version: value.read_version, + uuid: value.uuid.clone(), + operation: Some(operation), + blob_operation, + tag: value.tag.clone().unwrap_or("".to_string()), + } + } +} diff --git a/rust/lance-table/src/format/transaction/v1/operation.rs b/rust/lance-table/src/format/transaction/v1/operation.rs new file mode 100644 index 0000000000..5202b22dc8 --- /dev/null +++ b/rust/lance-table/src/format/transaction/v1/operation.rs @@ -0,0 +1,1029 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::collections::{HashMap, HashSet}; + +use deepsize::DeepSizeOf; +use lance_core::datatypes::Schema; +use lance_file::datatypes::Fields; + +use crate::format::pb; +use crate::format::{pb::IndexMetadata, Fragment, Index, Manifest}; +use lance_core::{Error, Result}; +use snafu::{location, Location}; +use uuid::Uuid; + +/// An operation on a dataset. +#[derive(Debug, Clone, DeepSizeOf)] +pub enum Operation { + /// Adding new fragments to the dataset. The fragments contained within + /// haven't yet been assigned a final ID. + Append { fragments: Vec }, + /// Updated fragments contain those that have been modified with new deletion + /// files. The deleted fragment IDs are those that should be removed from + /// the manifest. + Delete { + updated_fragments: Vec, + deleted_fragment_ids: Vec, + predicate: String, + }, + /// Overwrite the entire dataset with the given fragments. This is also + /// used when initially creating a table. + Overwrite { + fragments: Vec, + schema: Schema, + config_upsert_values: Option>, + }, + /// A new index has been created. + CreateIndex { + /// The new secondary indices that are being added + new_indices: Vec, + /// The indices that have been modified. + removed_indices: Vec, + }, + /// Data is rewritten but *not* modified. This is used for things like + /// compaction or re-ordering. Contains the old fragments and the new + /// ones that have been replaced. + /// + /// This operation will modify the row addresses of existing rows and + /// so any existing index covering a rewritten fragment will need to be + /// remapped. + Rewrite { + /// Groups of fragments that have been modified + groups: Vec, + /// Indices that have been updated with the new row addresses + rewritten_indices: Vec, + }, + /// Merge a new column in + Merge { + fragments: Vec, + schema: Schema, + }, + /// Restore an old version of the database + Restore { version: u64 }, + /// Reserves fragment ids for future use + /// This can be used when row ids need to be known before a transaction + /// has been committed. It is used during a rewrite operation to allow + /// indices to be remapped to the new row ids as part of the operation. + ReserveFragments { num_fragments: u32 }, + + /// Update values in the dataset. + Update { + /// Ids of fragments that have been moved + removed_fragment_ids: Vec, + /// Fragments that have been updated + updated_fragments: Vec, + /// Fragments that have been added + new_fragments: Vec, + }, + + /// Project to a new schema. This only changes the schema, not the data. + Project { schema: Schema }, + + /// Update the dataset configuration. + UpdateConfig { + upsert_values: Option>, + delete_keys: Option>, + }, +} + +#[derive(Debug, Clone)] +pub struct RewrittenIndex { + pub old_id: Uuid, + pub new_id: Uuid, +} + +impl DeepSizeOf for RewrittenIndex { + fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize { + 0 + } +} + +#[derive(Debug, Clone, DeepSizeOf)] +pub struct RewriteGroup { + pub old_fragments: Vec, + pub new_fragments: Vec, +} + +impl Operation { + /// Returns the IDs of fragments that have been modified by this operation. + /// + /// This does not include new fragments. + fn modified_fragment_ids(&self) -> Box + '_> { + match self { + // These operations add new fragments or don't modify any. + Self::Append { .. } + | Self::Overwrite { .. } + | Self::CreateIndex { .. } + | Self::ReserveFragments { .. } + | Self::Project { .. } + | Self::UpdateConfig { .. } + | Self::Restore { .. } => Box::new(std::iter::empty()), + Self::Delete { + updated_fragments, + deleted_fragment_ids, + .. + } => Box::new( + updated_fragments + .iter() + .map(|f| f.id) + .chain(deleted_fragment_ids.iter().copied()), + ), + Self::Rewrite { groups, .. } => Box::new( + groups + .iter() + .flat_map(|f| f.old_fragments.iter().map(|f| f.id)), + ), + Self::Merge { fragments, .. } => Box::new(fragments.iter().map(|f| f.id)), + Self::Update { + updated_fragments, + removed_fragment_ids, + .. + } => Box::new( + updated_fragments + .iter() + .map(|f| f.id) + .chain(removed_fragment_ids.iter().copied()), + ), + } + } + + /// Returns the config keys that have been upserted by this operation. + fn get_upsert_config_keys(&self) -> Vec { + match self { + Self::Overwrite { + config_upsert_values: Some(upsert_values), + .. + } => { + let vec: Vec = upsert_values.keys().cloned().collect(); + vec + } + Self::UpdateConfig { + upsert_values: Some(uv), + .. + } => { + let vec: Vec = uv.keys().cloned().collect(); + vec + } + _ => Vec::::new(), + } + } + + /// Returns the config keys that have been deleted by this operation. + fn get_delete_config_keys(&self) -> Vec { + match self { + Self::UpdateConfig { + delete_keys: Some(dk), + .. + } => dk.clone(), + _ => Vec::::new(), + } + } + + /// Check whether another operation modifies the same fragment IDs as this one. + fn modifies_same_ids(&self, other: &Self) -> bool { + let self_ids = self.modified_fragment_ids().collect::>(); + let mut other_ids = other.modified_fragment_ids(); + other_ids.any(|id| self_ids.contains(&id)) + } + + /// Check whether another operation upserts a key that is referenced by another operation + fn upsert_key_conflict(&self, other: &Self) -> bool { + let self_upsert_keys = self.get_upsert_config_keys(); + let other_upsert_keys = other.get_upsert_config_keys(); + + let self_delete_keys = self.get_delete_config_keys(); + let other_delete_keys = other.get_delete_config_keys(); + + self_upsert_keys + .iter() + .any(|x| other_upsert_keys.contains(x) || other_delete_keys.contains(x)) + || other_upsert_keys + .iter() + .any(|x| self_upsert_keys.contains(x) || self_delete_keys.contains(x)) + } + + pub fn name(&self) -> &str { + match self { + Self::Append { .. } => "Append", + Self::Delete { .. } => "Delete", + Self::Overwrite { .. } => "Overwrite", + Self::CreateIndex { .. } => "CreateIndex", + Self::Rewrite { .. } => "Rewrite", + Self::Merge { .. } => "Merge", + Self::ReserveFragments { .. } => "ReserveFragments", + Self::Restore { .. } => "Restore", + Self::Update { .. } => "Update", + Self::Project { .. } => "Project", + Self::UpdateConfig { .. } => "UpdateConfig", + } + } + + /// Returns true if the transaction cannot be committed if the other + /// transaction is committed first. + pub fn conflicts_with(&self, other: &Self) -> bool { + // This assumes IsolationLevel is Snapshot Isolation, which is more + // permissive than Serializable. In particular, it allows a Delete + // transaction to succeed after a concurrent Append, even if the Append + // added rows that would be deleted. + match &self { + Self::Append { .. } => match &other { + // Append is compatible with anything that doesn't change the schema + Self::Append { .. } => false, + Self::Rewrite { .. } => false, + Self::CreateIndex { .. } => false, + Self::Delete { .. } | Self::Update { .. } => false, + Self::ReserveFragments { .. } => false, + Self::Project { .. } => false, + Self::UpdateConfig { .. } => false, + _ => true, + }, + Self::Rewrite { .. } => match &other { + // Rewrite is only compatible with operations that don't touch + // existing fragments. + // TODO: it could also be compatible with operations that update + // fragments we don't touch. + Self::Append { .. } => false, + Self::ReserveFragments { .. } => false, + Self::Delete { .. } | Self::Rewrite { .. } | Self::Update { .. } => { + // As long as they rewrite disjoint fragments they shouldn't conflict. + self.modifies_same_ids(other) + } + Self::Project { .. } => false, + Self::UpdateConfig { .. } => false, + _ => true, + }, + // Restore always succeeds + Self::Restore { .. } => false, + // ReserveFragments is compatible with anything that doesn't reset the + // max fragment id. + Self::ReserveFragments { .. } => { + matches!(&other, Self::Overwrite { .. } | Self::Restore { .. }) + } + Self::CreateIndex { .. } => match &other { + Self::Append { .. } => false, + // Indices are identified by UUIDs, so they shouldn't conflict. + Self::CreateIndex { .. } => false, + // Although some of the rows we indexed may have been deleted / moved, + // row ids are still valid, so we allow this optimistically. + Self::Delete { .. } | Self::Update { .. } => false, + // Merge & reserve don't change row ids, so this should be fine. + Self::Merge { .. } => false, + Self::ReserveFragments { .. } => false, + // Rewrite likely changed many of the row ids, so our index is + // likely useless. It should be rebuilt. + // TODO: we could be smarter here and only invalidate the index + // if the rewrite changed more than X% of row ids. + Self::Rewrite { .. } => true, + Self::UpdateConfig { .. } => false, + _ => true, + }, + Self::Delete { .. } | Self::Update { .. } => match &other { + Self::CreateIndex { .. } => false, + Self::ReserveFragments { .. } => false, + Self::Delete { .. } | Self::Rewrite { .. } | Self::Update { .. } => { + // If we update the same fragments, we conflict. + self.modifies_same_ids(other) + } + Self::Project { .. } => false, + Self::Append { .. } => false, + Self::UpdateConfig { .. } => false, + _ => true, + }, + Self::Overwrite { .. } | Self::UpdateConfig { .. } => match &other { + Self::Overwrite { .. } | Self::UpdateConfig { .. } => { + self.upsert_key_conflict(other) + } + _ => false, + }, + // Merge changes the schema, but preserves row ids, so the only operations + // it's compatible with is CreateIndex, ReserveFragments, SetMetadata and DeleteMetadata. + Self::Merge { .. } => !matches!( + &other, + Self::CreateIndex { .. } + | Self::ReserveFragments { .. } + | Self::UpdateConfig { .. } + ), + Self::Project { .. } => match &other { + // Project is compatible with anything that doesn't change the schema + Self::CreateIndex { .. } => false, + Self::Overwrite { .. } => false, + Self::UpdateConfig { .. } => false, + _ => true, + }, + } + } +} + +/// Validate the operation is valid for the given manifest. +pub fn validate_operation(manifest: Option<&Manifest>, operation: &Operation) -> Result<()> { + let manifest = match (manifest, operation) { + ( + None, + Operation::Overwrite { + fragments, + schema, + config_upsert_values: None, + }, + ) => { + // Validate here because we are going to return early. + schema_fragments_valid(schema, fragments)?; + + return Ok(()); + } + (Some(manifest), _) => manifest, + (None, _) => { + return Err(Error::invalid_input( + format!( + "Cannot apply operation {} to non-existent dataset", + operation.name() + ), + location!(), + )); + } + }; + + match operation { + Operation::Append { fragments } => { + // Fragments must contain all fields in the schema + schema_fragments_valid(&manifest.schema, fragments) + } + Operation::Project { schema } => { + schema_fragments_valid(schema, manifest.fragments.as_ref()) + } + Operation::Merge { fragments, schema } + | Operation::Overwrite { + fragments, + schema, + config_upsert_values: None, + } => schema_fragments_valid(schema, fragments), + Operation::Update { + updated_fragments, + new_fragments, + .. + } => { + schema_fragments_valid(&manifest.schema, updated_fragments)?; + schema_fragments_valid(&manifest.schema, new_fragments) + } + _ => Ok(()), + } +} + +/// Check that each fragment contains all fields in the schema. +/// It is not required that the schema contains all fields in the fragment. +/// There may be masked fields. +fn schema_fragments_valid(schema: &Schema, fragments: &[Fragment]) -> Result<()> { + // TODO: add additional validation. Consider consolidating with various + // validate() methods in the codebase. + for fragment in fragments { + for field in schema.fields_pre_order() { + if !fragment + .files + .iter() + .flat_map(|f| f.fields.iter()) + .any(|f_id| f_id == &field.id) + { + return Err(Error::invalid_input( + format!( + "Fragment {} does not contain field {:?}", + fragment.id, field + ), + location!(), + )); + } + } + } + Ok(()) +} + +impl From<&RewrittenIndex> for pb::transaction::rewrite::RewrittenIndex { + fn from(value: &RewrittenIndex) -> Self { + Self { + old_id: Some((&value.old_id).into()), + new_id: Some((&value.new_id).into()), + } + } +} + +impl From<&RewriteGroup> for pb::transaction::rewrite::RewriteGroup { + fn from(value: &RewriteGroup) -> Self { + Self { + old_fragments: value + .old_fragments + .iter() + .map(pb::DataFragment::from) + .collect(), + new_fragments: value + .new_fragments + .iter() + .map(pb::DataFragment::from) + .collect(), + } + } +} + +impl TryFrom<&pb::transaction::rewrite::RewrittenIndex> for RewrittenIndex { + type Error = Error; + + fn try_from(message: &pb::transaction::rewrite::RewrittenIndex) -> Result { + Ok(Self { + old_id: message + .old_id + .as_ref() + .map(Uuid::try_from) + .ok_or_else(|| { + Error::io( + "required field (old_id) missing from message".to_string(), + location!(), + ) + })??, + new_id: message + .new_id + .as_ref() + .map(Uuid::try_from) + .ok_or_else(|| { + Error::io( + "required field (new_id) missing from message".to_string(), + location!(), + ) + })??, + }) + } +} + +impl TryFrom for RewriteGroup { + type Error = Error; + + fn try_from(message: pb::transaction::rewrite::RewriteGroup) -> Result { + Ok(Self { + old_fragments: message + .old_fragments + .into_iter() + .map(Fragment::try_from) + .collect::>>()?, + new_fragments: message + .new_fragments + .into_iter() + .map(Fragment::try_from) + .collect::>>()?, + }) + } +} + +impl TryFrom for Operation { + type Error = Error; + + fn try_from(operation: pb::transaction::Operation) -> std::result::Result { + match operation { + pb::transaction::Operation::Append(pb::transaction::Append { fragments }) => { + Ok(Self::Append { + fragments: fragments + .into_iter() + .map(Fragment::try_from) + .collect::>>()?, + }) + } + pb::transaction::Operation::Delete(pb::transaction::Delete { + updated_fragments, + deleted_fragment_ids, + predicate, + }) => Ok(Self::Delete { + updated_fragments: updated_fragments + .into_iter() + .map(Fragment::try_from) + .collect::>>()?, + deleted_fragment_ids, + predicate, + }), + pb::transaction::Operation::Overwrite(pb::transaction::Overwrite { + fragments, + schema, + schema_metadata, + config_upsert_values, + }) => { + let config_upsert_option = if config_upsert_values.is_empty() { + Some(config_upsert_values) + } else { + None + }; + + Ok(Self::Overwrite { + fragments: fragments + .into_iter() + .map(Fragment::try_from) + .collect::>>()?, + schema: convert_schema(schema, schema_metadata)?, + config_upsert_values: config_upsert_option, + }) + } + pb::transaction::Operation::ReserveFragments(pb::transaction::ReserveFragments { + num_fragments, + }) => Ok(Self::ReserveFragments { num_fragments }), + pb::transaction::Operation::Rewrite(pb::transaction::Rewrite { + old_fragments, + new_fragments, + groups, + rewritten_indices, + }) => { + let groups = if !groups.is_empty() { + groups + .into_iter() + .map(RewriteGroup::try_from) + .collect::>()? + } else { + vec![RewriteGroup { + old_fragments: old_fragments + .into_iter() + .map(Fragment::try_from) + .collect::>>()?, + new_fragments: new_fragments + .into_iter() + .map(Fragment::try_from) + .collect::>>()?, + }] + }; + let rewritten_indices = rewritten_indices + .iter() + .map(RewrittenIndex::try_from) + .collect::>()?; + + Ok(Self::Rewrite { + groups, + rewritten_indices, + }) + } + pb::transaction::Operation::CreateIndex(pb::transaction::CreateIndex { + new_indices, + removed_indices, + }) => Ok(Self::CreateIndex { + new_indices: new_indices + .into_iter() + .map(Index::try_from) + .collect::>()?, + removed_indices: removed_indices + .into_iter() + .map(Index::try_from) + .collect::>()?, + }), + pb::transaction::Operation::Merge(pb::transaction::Merge { + fragments, + schema, + schema_metadata, + }) => Ok(Self::Merge { + fragments: fragments + .into_iter() + .map(Fragment::try_from) + .collect::>>()?, + schema: convert_schema(schema, schema_metadata)?, + }), + pb::transaction::Operation::Restore(pb::transaction::Restore { version }) => { + Ok(Self::Restore { version }) + } + pb::transaction::Operation::Update(pb::transaction::Update { + removed_fragment_ids, + updated_fragments, + new_fragments, + }) => Ok(Self::Update { + removed_fragment_ids, + updated_fragments: updated_fragments + .into_iter() + .map(Fragment::try_from) + .collect::>>()?, + new_fragments: new_fragments + .into_iter() + .map(Fragment::try_from) + .collect::>>()?, + }), + pb::transaction::Operation::Project(pb::transaction::Project { schema }) => { + Ok(Self::Project { + schema: Schema::from(&Fields(schema)), + }) + } + pb::transaction::Operation::UpdateConfig(pb::transaction::UpdateConfig { + upsert_values, + delete_keys, + }) => { + let upsert_values = match upsert_values.len() { + 0 => None, + _ => Some(upsert_values), + }; + let delete_keys = match delete_keys.len() { + 0 => None, + _ => Some(delete_keys), + }; + Ok(Self::UpdateConfig { + upsert_values, + delete_keys, + }) + } + pb::transaction::Operation::CompositeOperation(_) => Err(Error::invalid_input( + "CompositeOperation cannot be converted to Operation".to_string(), + location!(), + )), + } + } +} + +impl TryFrom for Operation { + type Error = Error; + + fn try_from(blob_op: pb::transaction::BlobOperation) -> std::result::Result { + match blob_op { + pb::transaction::BlobOperation::BlobAppend(pb::transaction::Append { fragments }) => { + Result::Ok(Self::Append { + fragments: fragments + .into_iter() + .map(Fragment::try_from) + .collect::>>()?, + }) + } + pb::transaction::BlobOperation::BlobOverwrite(pb::transaction::Overwrite { + fragments, + schema, + schema_metadata, + config_upsert_values, + }) => { + let config_upsert_option = if config_upsert_values.is_empty() { + Some(config_upsert_values) + } else { + None + }; + + Ok(Self::Overwrite { + fragments: fragments + .into_iter() + .map(Fragment::try_from) + .collect::>>()?, + schema: convert_schema(schema, schema_metadata)?, + config_upsert_values: config_upsert_option, + }) + } + } + } +} + +impl From<&Operation> for pb::transaction::Operation { + fn from(operation: &Operation) -> Self { + match operation { + Operation::Append { fragments } => Self::Append(pb::transaction::Append { + fragments: fragments.iter().map(pb::DataFragment::from).collect(), + }), + Operation::Delete { + updated_fragments, + deleted_fragment_ids, + predicate, + } => Self::Delete(pb::transaction::Delete { + updated_fragments: updated_fragments + .iter() + .map(pb::DataFragment::from) + .collect(), + deleted_fragment_ids: deleted_fragment_ids.clone(), + predicate: predicate.clone(), + }), + Operation::Overwrite { + fragments, + schema, + config_upsert_values, + } => Self::Overwrite(pb::transaction::Overwrite { + fragments: fragments.iter().map(pb::DataFragment::from).collect(), + schema: Fields::from(schema).0, + schema_metadata: extract_metadata(&schema.metadata), + config_upsert_values: config_upsert_values.clone().unwrap_or(Default::default()), + }), + Operation::ReserveFragments { num_fragments } => { + Self::ReserveFragments(pb::transaction::ReserveFragments { + num_fragments: *num_fragments, + }) + } + Operation::Rewrite { + groups, + rewritten_indices, + } => Self::Rewrite(pb::transaction::Rewrite { + groups: groups + .iter() + .map(pb::transaction::rewrite::RewriteGroup::from) + .collect(), + rewritten_indices: rewritten_indices + .iter() + .map(|rewritten| rewritten.into()) + .collect(), + ..Default::default() + }), + Operation::CreateIndex { + new_indices, + removed_indices, + } => Self::CreateIndex(pb::transaction::CreateIndex { + new_indices: new_indices.iter().map(IndexMetadata::from).collect(), + removed_indices: removed_indices.iter().map(IndexMetadata::from).collect(), + }), + Operation::Merge { fragments, schema } => Self::Merge(pb::transaction::Merge { + fragments: fragments.iter().map(pb::DataFragment::from).collect(), + schema: Fields::from(schema).0, + schema_metadata: extract_metadata(&schema.metadata), + }), + Operation::Restore { version } => { + Self::Restore(pb::transaction::Restore { version: *version }) + } + Operation::Update { + removed_fragment_ids, + updated_fragments, + new_fragments, + } => Self::Update(pb::transaction::Update { + removed_fragment_ids: removed_fragment_ids.clone(), + updated_fragments: updated_fragments + .iter() + .map(pb::DataFragment::from) + .collect(), + new_fragments: new_fragments.iter().map(pb::DataFragment::from).collect(), + }), + Operation::Project { schema } => Self::Project(pb::transaction::Project { + schema: Fields::from(schema).0, + }), + Operation::UpdateConfig { + upsert_values, + delete_keys, + } => Self::UpdateConfig(pb::transaction::UpdateConfig { + upsert_values: upsert_values.clone().unwrap_or(Default::default()), + delete_keys: delete_keys.clone().unwrap_or(Default::default()), + }), + } + } +} + +impl From<&Operation> for pb::transaction::BlobOperation { + fn from(operation: &Operation) -> Self { + match operation { + Operation::Append { fragments } => Self::BlobAppend(pb::transaction::Append { + fragments: fragments.iter().map(pb::DataFragment::from).collect(), + }), + Operation::Overwrite { + fragments, + schema, + config_upsert_values, + } => Self::BlobOverwrite(pb::transaction::Overwrite { + fragments: fragments.iter().map(pb::DataFragment::from).collect(), + schema: Fields::from(schema).0, + schema_metadata: extract_metadata(&schema.metadata), + config_upsert_values: config_upsert_values.clone().unwrap_or(Default::default()), + }), + _ => unimplemented!(), + } + } +} + +fn convert_schema( + fields: Vec, + metadata: HashMap>, +) -> Result { + let mut schema = Schema::from(&Fields(fields)); + schema.metadata = metadata + .into_iter() + .map(|(k, v)| { + let value = String::from_utf8(v).map_err(|err| { + Error::invalid_input( + format!("Schema metadata value is not valid UTF-8: {}", err), + location!(), + ) + })?; + Ok((k, value)) + }) + .collect::>()?; + Ok(schema) +} + +fn extract_metadata(metadata: &HashMap) -> HashMap> { + metadata + .iter() + .map(|(k, v)| (k.clone(), v.clone().into_bytes())) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_conflicts() { + let index0 = Index { + uuid: uuid::Uuid::new_v4(), + name: "test".to_string(), + fields: vec![0], + dataset_version: 1, + fragment_bitmap: None, + index_details: None, + }; + let fragment0 = Fragment::new(0); + let fragment1 = Fragment::new(1); + let fragment2 = Fragment::new(2); + // The transactions that will be checked against + let other_operations = [ + Operation::Append { + fragments: vec![fragment0.clone()], + }, + Operation::CreateIndex { + new_indices: vec![index0.clone()], + removed_indices: vec![index0.clone()], + }, + Operation::Delete { + updated_fragments: vec![fragment0.clone()], + deleted_fragment_ids: vec![2], + predicate: "x > 2".to_string(), + }, + Operation::Merge { + fragments: vec![fragment0.clone(), fragment2.clone()], + schema: Schema::default(), + }, + Operation::Overwrite { + fragments: vec![fragment0.clone(), fragment2.clone()], + schema: Schema::default(), + config_upsert_values: Some(HashMap::from_iter(vec![( + "overwrite-key".to_string(), + "value".to_string(), + )])), + }, + Operation::Rewrite { + groups: vec![RewriteGroup { + old_fragments: vec![fragment0.clone()], + new_fragments: vec![fragment1.clone()], + }], + rewritten_indices: vec![], + }, + Operation::ReserveFragments { num_fragments: 3 }, + Operation::Update { + removed_fragment_ids: vec![1], + updated_fragments: vec![fragment0.clone()], + new_fragments: vec![fragment2.clone()], + }, + Operation::UpdateConfig { + upsert_values: Some(HashMap::from_iter(vec![( + "lance.test".to_string(), + "value".to_string(), + )])), + delete_keys: Some(vec!["remove-key".to_string()]), + }, + ]; + + // Transactions and whether they are expected to conflict with each + // of other_transactions + let cases = [ + ( + Operation::Append { + fragments: vec![fragment0.clone()], + }, + [false, false, false, true, true, false, false, false, false], + ), + ( + Operation::Delete { + // Delete that affects fragments different from other transactions + updated_fragments: vec![fragment1.clone()], + deleted_fragment_ids: vec![], + predicate: "x > 2".to_string(), + }, + [false, false, false, true, true, false, false, true, false], + ), + ( + Operation::Delete { + // Delete that affects same fragments as other transactions + updated_fragments: vec![fragment0.clone(), fragment2.clone()], + deleted_fragment_ids: vec![], + predicate: "x > 2".to_string(), + }, + [false, false, true, true, true, true, false, true, false], + ), + ( + Operation::Overwrite { + fragments: vec![fragment0.clone(), fragment2.clone()], + schema: Schema::default(), + config_upsert_values: None, + }, + // No conflicts: overwrite can always happen since it doesn't + // depend on previous state of the table. + [ + false, false, false, false, false, false, false, false, false, + ], + ), + ( + Operation::CreateIndex { + new_indices: vec![index0.clone()], + removed_indices: vec![index0], + }, + // Will only conflict with operations that modify row ids. + [false, false, false, false, true, true, false, false, false], + ), + ( + // Rewrite that affects different fragments + Operation::Rewrite { + groups: vec![RewriteGroup { + old_fragments: vec![fragment1], + new_fragments: vec![fragment0.clone()], + }], + rewritten_indices: Vec::new(), + }, + [false, true, false, true, true, false, false, true, false], + ), + ( + // Rewrite that affects the same fragments + Operation::Rewrite { + groups: vec![RewriteGroup { + old_fragments: vec![fragment0.clone(), fragment2.clone()], + new_fragments: vec![fragment0.clone()], + }], + rewritten_indices: Vec::new(), + }, + [false, true, true, true, true, true, false, true, false], + ), + ( + Operation::Merge { + fragments: vec![fragment0.clone(), fragment2.clone()], + schema: Schema::default(), + }, + // Merge conflicts with everything except CreateIndex and ReserveFragments. + [true, false, true, true, true, true, false, true, false], + ), + ( + Operation::ReserveFragments { num_fragments: 2 }, + // ReserveFragments only conflicts with Overwrite and Restore. + [false, false, false, false, true, false, false, false, false], + ), + ( + Operation::Update { + // Update that affects same fragments as other transactions + updated_fragments: vec![fragment0], + removed_fragment_ids: vec![], + new_fragments: vec![fragment2], + }, + [false, false, true, true, true, true, false, true, false], + ), + ( + // Update config that should not conflict with anything + Operation::UpdateConfig { + upsert_values: Some(HashMap::from_iter(vec![( + "other-key".to_string(), + "new-value".to_string(), + )])), + delete_keys: None, + }, + [ + false, false, false, false, false, false, false, false, false, + ], + ), + ( + // Update config that conflicts with key being upserted by other UpdateConfig operation + Operation::UpdateConfig { + upsert_values: Some(HashMap::from_iter(vec![( + "lance.test".to_string(), + "new-value".to_string(), + )])), + delete_keys: None, + }, + [false, false, false, false, false, false, false, false, true], + ), + ( + // Update config that conflicts with key being deleted by other UpdateConfig operation + Operation::UpdateConfig { + upsert_values: Some(HashMap::from_iter(vec![( + "remove-key".to_string(), + "new-value".to_string(), + )])), + delete_keys: None, + }, + [false, false, false, false, false, false, false, false, true], + ), + ( + // Delete config keys currently being deleted by other UpdateConfig operation + Operation::UpdateConfig { + upsert_values: None, + delete_keys: Some(vec!["remove-key".to_string()]), + }, + [ + false, false, false, false, false, false, false, false, false, + ], + ), + ( + // Delete config keys currently being upserted by other UpdateConfig operation + Operation::UpdateConfig { + upsert_values: None, + delete_keys: Some(vec!["lance.test".to_string()]), + }, + [false, false, false, false, false, false, false, false, true], + ), + ]; + + for (operation, expected_conflicts) in &cases { + for (other, expected_conflict) in other_operations.iter().zip(expected_conflicts) { + assert_eq!( + operation.conflicts_with(other), + *expected_conflict, + "Operation {:?} should {} with {:?}", + operation, + if *expected_conflict { + "conflict" + } else { + "not conflict" + }, + other + ); + } + } + } +} diff --git a/rust/lance-table/src/format/transaction/v2/action.rs b/rust/lance-table/src/format/transaction/v2/action.rs new file mode 100644 index 0000000000..37ce551890 --- /dev/null +++ b/rust/lance-table/src/format/transaction/v2/action.rs @@ -0,0 +1,87 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use crate::format::{pb, Fragment}; +use deepsize::DeepSizeOf; +use lance_core::{Error, Result}; +use snafu::{location, Location}; + +/// A change to a [`Manifest`]. +#[derive(Debug, Clone, DeepSizeOf)] +pub enum Action { + // Fragment changes + AddFragments { fragments: Vec }, + DeleteFragments { deleted_fragment_ids: Vec }, + UpdateFragments { fragments: Vec }, +} + +impl TryFrom for Action { + type Error = Error; + + fn try_from(value: pb::transaction::Action) -> std::result::Result { + if let Some(action) = value.action { + Self::try_from(action) + } else { + Err(Error::NotSupported { + source: "No known action was found".into(), + location: location!(), + }) + } + } +} + +impl TryFrom for Action { + type Error = Error; + + fn try_from(value: pb::transaction::action::Action) -> std::result::Result { + use pb::transaction::action::Action::*; + match value { + AddFragments(action) => Ok(Self::AddFragments { + fragments: action + .fragments + .into_iter() + .map(Fragment::try_from) + .collect::>()?, + }), + DeleteFragments(action) => Ok(Self::DeleteFragments { + deleted_fragment_ids: action.deleted_fragment_ids, + }), + UpdateFragments(action) => Ok(Self::UpdateFragments { + fragments: action + .updated_fragments + .into_iter() + .map(Fragment::try_from) + .collect::>()?, + }), + } + } +} + +impl From<&Action> for pb::transaction::Action { + fn from(value: &Action) -> Self { + use pb::transaction::action::{self as pb_action}; + match value { + Action::AddFragments { fragments } => Self { + action: Some(pb_action::Action::AddFragments(pb_action::AddFragments { + fragments: fragments.iter().map(Into::into).collect(), + })), + }, + Action::DeleteFragments { + deleted_fragment_ids, + } => Self { + action: Some(pb_action::Action::DeleteFragments( + pb_action::DeleteFragments { + deleted_fragment_ids: deleted_fragment_ids.clone(), + }, + )), + }, + Action::UpdateFragments { fragments } => Self { + action: Some(pb_action::Action::UpdateFragments( + pb_action::UpdateFragments { + updated_fragments: fragments.iter().map(Into::into).collect(), + }, + )), + }, + } + } +} diff --git a/rust/lance-table/src/format/transaction/v2/mod.rs b/rust/lance-table/src/format/transaction/v2/mod.rs new file mode 100644 index 0000000000..777724d5cf --- /dev/null +++ b/rust/lance-table/src/format/transaction/v2/mod.rs @@ -0,0 +1,80 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +mod action; + +pub use action::Action; + +use crate::format::pb; +use deepsize::DeepSizeOf; +use lance_core::{Error, Result}; + +#[derive(Debug, Clone, DeepSizeOf)] +pub struct Transaction { + pub read_version: u64, + pub uuid: String, + pub operations: Vec, + pub blob_ops: Vec, +} + +/// A group of actions that make up a user-facing operation. +/// +/// For example, a user might call a `CREATE TABLE` statement, which includes +/// both a [Action::ReplaceSchema] and an [Action::AddFragments]. +#[derive(Debug, Clone, DeepSizeOf, Default)] +pub struct UserOperation { + pub read_version: u64, + pub uuid: String, + pub description: String, + pub actions: Vec, +} + +impl From<&UserOperation> for pb::transaction::UserOperation { + fn from(value: &UserOperation) -> Self { + Self { + read_version: value.read_version, + uuid: value.uuid.clone(), + description: value.description.clone(), + actions: value.actions.iter().map(Into::into).collect(), + } + } +} + +impl TryFrom for UserOperation { + type Error = Error; + + fn try_from(value: pb::transaction::UserOperation) -> std::result::Result { + Ok(Self { + read_version: value.read_version, + uuid: value.uuid, + description: value.description, + actions: value + .actions + .into_iter() + .map(TryInto::try_into) + .collect::>()?, + }) + } +} + +impl From<&Transaction> for pb::Transaction { + fn from(value: &Transaction) -> Self { + let operation = + pb::transaction::Operation::CompositeOperation(pb::transaction::CompositeOperation { + user_operations: value.operations.iter().map(Into::into).collect(), + }); + + // TODO: Handle blob operations + if !value.blob_ops.is_empty() { + unimplemented!("Blob operations are not yet supported"); + } + + Self { + read_version: value.read_version, + uuid: value.uuid.clone(), + operation: Some(operation), + blob_operation: None, + tag: Default::default(), + } + } +} diff --git a/rust/lance-table/src/utils.rs b/rust/lance-table/src/utils.rs index 8e14f0ae9a..bb598802e4 100644 --- a/rust/lance-table/src/utils.rs +++ b/rust/lance-table/src/utils.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::time::SystemTime; + pub mod stream; pub trait LanceIteratorExtension { @@ -45,3 +47,11 @@ impl Iterator for ExactSize { (self.size, Some(self.size)) } } + +pub fn timestamp_to_nanos(timestamp: Option) -> u128 { + let timestamp = timestamp.unwrap_or_else(SystemTime::now); + timestamp + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() +} diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 58c6d8d140..82b6610a79 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -23,13 +23,15 @@ use lance_io::object_writer::ObjectWriter; use lance_io::traits::WriteExt; use lance_io::utils::{read_last_block, read_metadata_offset, read_struct}; use lance_table::format::{ - DataStorageFormat, Fragment, Index, Manifest, MAGIC, MAJOR_VERSION, MINOR_VERSION, + transaction::{ManifestWriteConfig, Operation, Transaction}, + Fragment, Index, Manifest, MAGIC, MAJOR_VERSION, MINOR_VERSION, }; use lance_table::io::commit::{ migrate_scheme_to_v2, CommitError, CommitHandler, CommitLock, ManifestLocation, ManifestNamingScheme, }; use lance_table::io::manifest::{read_manifest, write_manifest}; +use lance_table::utils::timestamp_to_nanos; use object_store::path::Path; use prost::Message; use rowids::get_row_id_index; @@ -65,13 +67,12 @@ use self::cleanup::RemovalStats; use self::fragment::FileFragment; use self::refs::Tags; use self::scanner::{DatasetRecordBatchStream, Scanner}; -use self::transaction::{Operation, Transaction}; use self::write::write_fragments_internal; use crate::datatypes::Schema; use crate::error::box_error; use crate::io::commit::{commit_detached_transaction, commit_new_dataset, commit_transaction}; use crate::session::Session; -use crate::utils::temporal::{timestamp_to_nanos, utc_now, SystemTime}; +use crate::utils::temporal::utc_now; use crate::{Error, Result}; pub use blob::BlobFile; use hash_joiner::HashJoiner; @@ -571,13 +572,12 @@ impl Dataset { let (latest_manifest, _) = self.latest_manifest().await?; let latest_version = latest_manifest.version; - let transaction = Transaction::new( + let transaction = Transaction::new_v1( latest_version, Operation::Restore { version: self.manifest.version, }, /*blobs_op=*/ None, - None, ); let (restored_manifest, path) = commit_transaction( @@ -656,7 +656,7 @@ impl Dataset { Ok, )?; - let transaction = Transaction::new(read_version, operation, blobs_op, None); + let transaction = Transaction::new_v1(read_version, operation, blobs_op); let mut builder = CommitBuilder::new(base_uri) .with_object_store_registry(object_store_registry) @@ -914,7 +914,7 @@ impl Dataset { }) .await?; - let transaction = Transaction::new( + let transaction = Transaction::new_v1( self.manifest.version, Operation::Delete { updated_fragments, @@ -925,7 +925,6 @@ impl Dataset { // rows that reference them are deleted. /*blobs_op=*/ None, - None, ); let (manifest, path) = commit_transaction( @@ -1445,7 +1444,7 @@ impl Dataset { .try_collect::>() .await?; - let transaction = Transaction::new( + let transaction = Transaction::new_v1( self.manifest.version, Operation::Merge { fragments: updated_fragments, @@ -1454,7 +1453,6 @@ impl Dataset { // It is not possible to add blob columns using merge /*blobs_op=*/ None, - None, ); let (manifest, manifest_path) = commit_transaction( @@ -1500,14 +1498,13 @@ impl Dataset { &mut self, upsert_values: impl IntoIterator, ) -> Result<()> { - let transaction = Transaction::new( + let transaction = Transaction::new_v1( self.manifest.version, Operation::UpdateConfig { upsert_values: Some(HashMap::from_iter(upsert_values)), delete_keys: None, }, /*blobs_op=*/ None, - None, ); let (manifest, manifest_path) = commit_transaction( @@ -1529,14 +1526,13 @@ impl Dataset { /// Delete keys from the config. pub async fn delete_config_keys(&mut self, delete_keys: &[&str]) -> Result<()> { - let transaction = Transaction::new( + let transaction = Transaction::new_v1( self.manifest.version, Operation::UpdateConfig { upsert_values: None, delete_keys: Some(Vec::from_iter(delete_keys.iter().map(ToString::to_string))), }, /*blob_op=*/ None, - None, ); let (manifest, manifest_path) = commit_transaction( @@ -1568,27 +1564,6 @@ impl DatasetTakeRows for Dataset { } } -#[derive(Debug)] -pub(crate) struct ManifestWriteConfig { - auto_set_feature_flags: bool, // default true - timestamp: Option, // default None - use_move_stable_row_ids: bool, // default false - use_legacy_format: Option, // default None - storage_format: Option, // default None -} - -impl Default for ManifestWriteConfig { - fn default() -> Self { - Self { - auto_set_feature_flags: true, - timestamp: None, - use_move_stable_row_ids: false, - use_legacy_format: None, - storage_format: None, - } - } -} - /// Commit a manifest file and create a copy at the latest manifest path. pub(crate) async fn write_manifest_file( object_store: &ObjectStore, @@ -2025,7 +2000,7 @@ mod tests { #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)] data_storage_version: LanceFileVersion, ) { - use lance_table::feature_flags::FLAG_UNKNOWN; + use lance_table::{feature_flags::FLAG_UNKNOWN, format::DataStorageFormat}; let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 034168c26c..d5f1961920 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -588,13 +588,12 @@ async fn reserve_fragment_ids( dataset: &Dataset, fragments: impl ExactSizeIterator, ) -> Result<()> { - let transaction = Transaction::new( + let transaction = Transaction::new_v1( dataset.manifest.version, Operation::ReserveFragments { num_fragments: fragments.len() as u32, }, /*blob_op=*/ None, - None, ); let (manifest, _) = commit_transaction( @@ -891,7 +890,7 @@ pub async fn commit_compaction( Vec::new() }; - let transaction = Transaction::new( + let transaction = Transaction::new_v1( dataset.manifest.version, Operation::Rewrite { groups: rewrite_groups, @@ -899,7 +898,6 @@ pub async fn commit_compaction( }, // TODO: Add a blob compaction pass /*blob_op= */ None, - None, ); let (manifest, manifest_path) = commit_transaction( diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index 6e9993435a..b0143e8f2e 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -262,12 +262,11 @@ pub(super) async fn add_columns( .await?; let operation = Operation::Merge { fragments, schema }; - let transaction = Transaction::new( + let transaction = Transaction::new_v1( dataset.manifest.version, operation, // TODO: Make it possible to add new blob columns /*blob_op= */ None, - None, ); let (new_manifest, new_path) = commit_transaction( dataset, @@ -506,12 +505,11 @@ pub(super) async fn alter_columns( // If we aren't casting a column, we don't need to touch the fragments. let transaction = if cast_fields.is_empty() { - Transaction::new( + Transaction::new_v1( dataset.manifest.version, Operation::Project { schema: new_schema }, // TODO: Make it possible to alter blob columns /*blob_op= */ None, - None, ) } else { // Otherwise, we need to re-write the relevant fields. @@ -578,14 +576,13 @@ pub(super) async fn alter_columns( }) .collect::>(); - Transaction::new( + Transaction::new_v1( dataset.manifest.version, Operation::Merge { schema: new_schema, fragments, }, /*blob_op= */ None, - None, ) }; @@ -646,11 +643,10 @@ pub(super) async fn drop_columns(dataset: &mut Dataset, columns: &[&str]) -> Res )); } - let transaction = Transaction::new( + let transaction = Transaction::new_v1( dataset.manifest.version, Operation::Project { schema: new_schema }, /*blob_op= */ None, - None, ); let (manifest, manifest_path) = commit_transaction( diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 558c0e9ba3..e1ab654284 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -1,1707 +1,4 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -//! Transaction definitions for updating datasets -//! -//! Prior to creating a new manifest, a transaction must be created representing -//! the changes being made to the dataset. By representing them as incremental -//! changes, we can detect whether concurrent operations are compatible with -//! one another. We can also rebuild manifests when retrying committing a -//! manifest. -//! -//! ## Conflict Resolution -//! -//! Transactions are compatible with one another if they don't conflict. -//! Currently, conflict resolution always assumes a Serializable isolation -//! level. -//! -//! Below are the compatibilities between conflicting transactions. The columns -//! represent the operation that has been applied, while the rows represent the -//! operation that is being checked for compatibility to see if it can retry. -//! ✅ indicates that the operation is compatible, while ❌ indicates that it is -//! a conflict. Some operations have additional conditions that must be met for -//! them to be compatible. -//! -//! | | Append | Delete / Update | Overwrite/Create | Create Index | Rewrite | Merge | Project | UpdateConfig | -//! |------------------|--------|-----------------|------------------|--------------|---------|-------|---------|-------------| -//! | Append | ✅ | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ | ✅ | -//! | Delete / Update | ✅ | (1) | ❌ | ✅ | (1) | ❌ | ❌ | ✅ | -//! | Overwrite/Create | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | (2) | -//! | Create index | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ | -//! | Rewrite | ✅ | (1) | ❌ | ❌ | (1) | ❌ | ❌ | ✅ | -//! | Merge | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ | ❌ | ✅ | -//! | Project | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ | -//! | UpdateConfig | ✅ | ✅ | (2) | ✅ | ✅ | ✅ | ✅ | (2) | -//! -//! (1) Delete, update, and rewrite are compatible with each other and themselves only if -//! they affect distinct fragments. Otherwise, they conflict. -//! (2) Operations that mutate the config conflict if one of the operations upserts a key -//! that if referenced by another concurrent operation. - -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; - -use deepsize::DeepSizeOf; -use lance_core::{datatypes::Schema, Error, Result}; -use lance_file::{datatypes::Fields, version::LanceFileVersion}; -use lance_io::object_store::ObjectStore; -use lance_table::{ - format::{ - pb::{self, IndexMetadata}, - DataStorageFormat, Fragment, Index, Manifest, RowIdMeta, - }, - io::{ - commit::CommitHandler, - manifest::{read_manifest, read_manifest_indexes}, - }, - rowids::{write_row_ids, RowIdSequence}, -}; -use object_store::path::Path; -use roaring::RoaringBitmap; -use snafu::{location, Location}; -use uuid::Uuid; - -use super::ManifestWriteConfig; -use crate::utils::temporal::timestamp_to_nanos; -use lance_table::feature_flags::{apply_feature_flags, FLAG_MOVE_STABLE_ROW_IDS}; - -/// A change to a dataset that can be retried -/// -/// This contains enough information to be able to build the next manifest, -/// given the current manifest. -#[derive(Debug, Clone, DeepSizeOf)] -pub struct Transaction { - /// The version of the table this transaction is based off of. If this is - /// the first transaction, this should be 0. - pub read_version: u64, - pub uuid: String, - pub operation: Operation, - /// If the transaction modified the blobs dataset, this is the operation - /// to apply to the blobs dataset. - /// - /// If this is `None`, then the blobs dataset was not modified - pub blobs_op: Option, - pub tag: Option, -} - -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -pub enum BlobsOperation { - /// The operation did not modify the blobs dataset - Unchanged, - /// The operation modified the blobs dataset, contains the new version of the blobs dataset - Updated(u64), -} - -/// An operation on a dataset. -#[derive(Debug, Clone, DeepSizeOf)] -pub enum Operation { - /// Adding new fragments to the dataset. The fragments contained within - /// haven't yet been assigned a final ID. - Append { fragments: Vec }, - /// Updated fragments contain those that have been modified with new deletion - /// files. The deleted fragment IDs are those that should be removed from - /// the manifest. - Delete { - updated_fragments: Vec, - deleted_fragment_ids: Vec, - predicate: String, - }, - /// Overwrite the entire dataset with the given fragments. This is also - /// used when initially creating a table. - Overwrite { - fragments: Vec, - schema: Schema, - config_upsert_values: Option>, - }, - /// A new index has been created. - CreateIndex { - /// The new secondary indices that are being added - new_indices: Vec, - /// The indices that have been modified. - removed_indices: Vec, - }, - /// Data is rewritten but *not* modified. This is used for things like - /// compaction or re-ordering. Contains the old fragments and the new - /// ones that have been replaced. - /// - /// This operation will modify the row addresses of existing rows and - /// so any existing index covering a rewritten fragment will need to be - /// remapped. - Rewrite { - /// Groups of fragments that have been modified - groups: Vec, - /// Indices that have been updated with the new row addresses - rewritten_indices: Vec, - }, - /// Merge a new column in - Merge { - fragments: Vec, - schema: Schema, - }, - /// Restore an old version of the database - Restore { version: u64 }, - /// Reserves fragment ids for future use - /// This can be used when row ids need to be known before a transaction - /// has been committed. It is used during a rewrite operation to allow - /// indices to be remapped to the new row ids as part of the operation. - ReserveFragments { num_fragments: u32 }, - - /// Update values in the dataset. - Update { - /// Ids of fragments that have been moved - removed_fragment_ids: Vec, - /// Fragments that have been updated - updated_fragments: Vec, - /// Fragments that have been added - new_fragments: Vec, - }, - - /// Project to a new schema. This only changes the schema, not the data. - Project { schema: Schema }, - - /// Update the dataset configuration. - UpdateConfig { - upsert_values: Option>, - delete_keys: Option>, - }, -} - -#[derive(Debug, Clone)] -pub struct RewrittenIndex { - pub old_id: Uuid, - pub new_id: Uuid, -} - -impl DeepSizeOf for RewrittenIndex { - fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize { - 0 - } -} - -#[derive(Debug, Clone, DeepSizeOf)] -pub struct RewriteGroup { - pub old_fragments: Vec, - pub new_fragments: Vec, -} - -impl Operation { - /// Returns the IDs of fragments that have been modified by this operation. - /// - /// This does not include new fragments. - fn modified_fragment_ids(&self) -> Box + '_> { - match self { - // These operations add new fragments or don't modify any. - Self::Append { .. } - | Self::Overwrite { .. } - | Self::CreateIndex { .. } - | Self::ReserveFragments { .. } - | Self::Project { .. } - | Self::UpdateConfig { .. } - | Self::Restore { .. } => Box::new(std::iter::empty()), - Self::Delete { - updated_fragments, - deleted_fragment_ids, - .. - } => Box::new( - updated_fragments - .iter() - .map(|f| f.id) - .chain(deleted_fragment_ids.iter().copied()), - ), - Self::Rewrite { groups, .. } => Box::new( - groups - .iter() - .flat_map(|f| f.old_fragments.iter().map(|f| f.id)), - ), - Self::Merge { fragments, .. } => Box::new(fragments.iter().map(|f| f.id)), - Self::Update { - updated_fragments, - removed_fragment_ids, - .. - } => Box::new( - updated_fragments - .iter() - .map(|f| f.id) - .chain(removed_fragment_ids.iter().copied()), - ), - } - } - - /// Returns the config keys that have been upserted by this operation. - fn get_upsert_config_keys(&self) -> Vec { - match self { - Self::Overwrite { - config_upsert_values: Some(upsert_values), - .. - } => { - let vec: Vec = upsert_values.keys().cloned().collect(); - vec - } - Self::UpdateConfig { - upsert_values: Some(uv), - .. - } => { - let vec: Vec = uv.keys().cloned().collect(); - vec - } - _ => Vec::::new(), - } - } - - /// Returns the config keys that have been deleted by this operation. - fn get_delete_config_keys(&self) -> Vec { - match self { - Self::UpdateConfig { - delete_keys: Some(dk), - .. - } => dk.clone(), - _ => Vec::::new(), - } - } - - /// Check whether another operation modifies the same fragment IDs as this one. - fn modifies_same_ids(&self, other: &Self) -> bool { - let self_ids = self.modified_fragment_ids().collect::>(); - let mut other_ids = other.modified_fragment_ids(); - other_ids.any(|id| self_ids.contains(&id)) - } - - /// Check whether another operation upserts a key that is referenced by another operation - fn upsert_key_conflict(&self, other: &Self) -> bool { - let self_upsert_keys = self.get_upsert_config_keys(); - let other_upsert_keys = other.get_upsert_config_keys(); - - let self_delete_keys = self.get_delete_config_keys(); - let other_delete_keys = other.get_delete_config_keys(); - - self_upsert_keys - .iter() - .any(|x| other_upsert_keys.contains(x) || other_delete_keys.contains(x)) - || other_upsert_keys - .iter() - .any(|x| self_upsert_keys.contains(x) || self_delete_keys.contains(x)) - } - - pub fn name(&self) -> &str { - match self { - Self::Append { .. } => "Append", - Self::Delete { .. } => "Delete", - Self::Overwrite { .. } => "Overwrite", - Self::CreateIndex { .. } => "CreateIndex", - Self::Rewrite { .. } => "Rewrite", - Self::Merge { .. } => "Merge", - Self::ReserveFragments { .. } => "ReserveFragments", - Self::Restore { .. } => "Restore", - Self::Update { .. } => "Update", - Self::Project { .. } => "Project", - Self::UpdateConfig { .. } => "UpdateConfig", - } - } -} - -impl Transaction { - pub fn new( - read_version: u64, - operation: Operation, - blobs_op: Option, - tag: Option, - ) -> Self { - let uuid = uuid::Uuid::new_v4().hyphenated().to_string(); - Self { - read_version, - uuid, - operation, - blobs_op, - tag, - } - } - - /// Returns true if the transaction cannot be committed if the other - /// transaction is committed first. - pub fn conflicts_with(&self, other: &Self) -> bool { - // This assumes IsolationLevel is Snapshot Isolation, which is more - // permissive than Serializable. In particular, it allows a Delete - // transaction to succeed after a concurrent Append, even if the Append - // added rows that would be deleted. - match &self.operation { - Operation::Append { .. } => match &other.operation { - // Append is compatible with anything that doesn't change the schema - Operation::Append { .. } => false, - Operation::Rewrite { .. } => false, - Operation::CreateIndex { .. } => false, - Operation::Delete { .. } | Operation::Update { .. } => false, - Operation::ReserveFragments { .. } => false, - Operation::Project { .. } => false, - Operation::UpdateConfig { .. } => false, - _ => true, - }, - Operation::Rewrite { .. } => match &other.operation { - // Rewrite is only compatible with operations that don't touch - // existing fragments. - // TODO: it could also be compatible with operations that update - // fragments we don't touch. - Operation::Append { .. } => false, - Operation::ReserveFragments { .. } => false, - Operation::Delete { .. } | Operation::Rewrite { .. } | Operation::Update { .. } => { - // As long as they rewrite disjoint fragments they shouldn't conflict. - self.operation.modifies_same_ids(&other.operation) - } - Operation::Project { .. } => false, - Operation::UpdateConfig { .. } => false, - _ => true, - }, - // Restore always succeeds - Operation::Restore { .. } => false, - // ReserveFragments is compatible with anything that doesn't reset the - // max fragment id. - Operation::ReserveFragments { .. } => matches!( - &other.operation, - Operation::Overwrite { .. } | Operation::Restore { .. } - ), - Operation::CreateIndex { .. } => match &other.operation { - Operation::Append { .. } => false, - // Indices are identified by UUIDs, so they shouldn't conflict. - Operation::CreateIndex { .. } => false, - // Although some of the rows we indexed may have been deleted / moved, - // row ids are still valid, so we allow this optimistically. - Operation::Delete { .. } | Operation::Update { .. } => false, - // Merge & reserve don't change row ids, so this should be fine. - Operation::Merge { .. } => false, - Operation::ReserveFragments { .. } => false, - // Rewrite likely changed many of the row ids, so our index is - // likely useless. It should be rebuilt. - // TODO: we could be smarter here and only invalidate the index - // if the rewrite changed more than X% of row ids. - Operation::Rewrite { .. } => true, - Operation::UpdateConfig { .. } => false, - _ => true, - }, - Operation::Delete { .. } | Operation::Update { .. } => match &other.operation { - Operation::CreateIndex { .. } => false, - Operation::ReserveFragments { .. } => false, - Operation::Delete { .. } | Operation::Rewrite { .. } | Operation::Update { .. } => { - // If we update the same fragments, we conflict. - self.operation.modifies_same_ids(&other.operation) - } - Operation::Project { .. } => false, - Operation::Append { .. } => false, - Operation::UpdateConfig { .. } => false, - _ => true, - }, - Operation::Overwrite { .. } | Operation::UpdateConfig { .. } => { - match &other.operation { - Operation::Overwrite { .. } | Operation::UpdateConfig { .. } => { - self.operation.upsert_key_conflict(&other.operation) - } - _ => false, - } - } - // Merge changes the schema, but preserves row ids, so the only operations - // it's compatible with is CreateIndex, ReserveFragments, SetMetadata and DeleteMetadata. - Operation::Merge { .. } => !matches!( - &other.operation, - Operation::CreateIndex { .. } - | Operation::ReserveFragments { .. } - | Operation::UpdateConfig { .. } - ), - Operation::Project { .. } => match &other.operation { - // Project is compatible with anything that doesn't change the schema - Operation::CreateIndex { .. } => false, - Operation::Overwrite { .. } => false, - Operation::UpdateConfig { .. } => false, - _ => true, - }, - } - } - - fn fragments_with_ids<'a, T>( - new_fragments: T, - fragment_id: &'a mut u64, - ) -> impl Iterator + 'a - where - T: IntoIterator + 'a, - { - new_fragments.into_iter().map(move |mut f| { - if f.id == 0 { - f.id = *fragment_id; - *fragment_id += 1; - } - f - }) - } - - fn data_storage_format_from_files( - fragments: &[Fragment], - user_requested: Option, - ) -> Result { - if let Some(file_version) = Fragment::try_infer_version(fragments)? { - // Ensure user-requested matches data files - if let Some(user_requested) = user_requested { - if user_requested != file_version { - return Err(Error::invalid_input( - format!("User requested data storage version ({}) does not match version in data files ({})", user_requested, file_version), - location!(), - )); - } - } - Ok(DataStorageFormat::new(file_version)) - } else { - // If no files use user-requested or default - Ok(user_requested - .map(DataStorageFormat::new) - .unwrap_or_default()) - } - } - - pub(crate) async fn restore_old_manifest( - object_store: &ObjectStore, - commit_handler: &dyn CommitHandler, - base_path: &Path, - version: u64, - config: &ManifestWriteConfig, - tx_path: &str, - ) -> Result<(Manifest, Vec)> { - let location = commit_handler - .resolve_version_location(base_path, version, &object_store.inner) - .await?; - let mut manifest = read_manifest(object_store, &location.path, location.size).await?; - manifest.set_timestamp(timestamp_to_nanos(config.timestamp)); - manifest.transaction_file = Some(tx_path.to_string()); - let indices = read_manifest_indexes(object_store, &location.path, &manifest).await?; - Ok((manifest, indices)) - } - - /// Create a new manifest from the current manifest and the transaction. - /// - /// `current_manifest` should only be None if the dataset does not yet exist. - pub(crate) fn build_manifest( - &self, - current_manifest: Option<&Manifest>, - current_indices: Vec, - transaction_file_path: &str, - config: &ManifestWriteConfig, - new_blob_version: Option, - ) -> Result<(Manifest, Vec)> { - if config.use_move_stable_row_ids - && current_manifest - .map(|m| !m.uses_move_stable_row_ids()) - .unwrap_or_default() - { - return Err(Error::NotSupported { - source: "Cannot enable stable row ids on existing dataset".into(), - location: location!(), - }); - } - - // Get the schema and the final fragment list - let schema = match self.operation { - Operation::Overwrite { ref schema, .. } => schema.clone(), - Operation::Merge { ref schema, .. } => schema.clone(), - Operation::Project { ref schema, .. } => schema.clone(), - _ => { - if let Some(current_manifest) = current_manifest { - current_manifest.schema.clone() - } else { - return Err(Error::Internal { - message: "Cannot create a new dataset without a schema".to_string(), - location: location!(), - }); - } - } - }; - - let mut fragment_id = if matches!(self.operation, Operation::Overwrite { .. }) { - 0 - } else { - current_manifest - .and_then(|m| m.max_fragment_id()) - .map(|id| id + 1) - .unwrap_or(0) - }; - let mut final_fragments = Vec::new(); - let mut final_indices = current_indices; - - let mut next_row_id = { - // Only use row ids if the feature flag is set already or - match (current_manifest, config.use_move_stable_row_ids) { - (Some(manifest), _) - if manifest.reader_feature_flags & FLAG_MOVE_STABLE_ROW_IDS != 0 => - { - Some(manifest.next_row_id) - } - (None, true) => Some(0), - (_, false) => None, - (Some(_), true) => { - return Err(Error::NotSupported { - source: "Cannot enable stable row ids on existing dataset".into(), - location: location!(), - }); - } - } - }; - - let maybe_existing_fragments = - current_manifest - .map(|m| m.fragments.as_ref()) - .ok_or_else(|| Error::Internal { - message: format!( - "No current manifest was provided while building manifest for operation {}", - self.operation.name() - ), - location: location!(), - }); - - match &self.operation { - Operation::Append { ref fragments } => { - final_fragments.extend(maybe_existing_fragments?.clone()); - let mut new_fragments = - Self::fragments_with_ids(fragments.clone(), &mut fragment_id) - .collect::>(); - if let Some(next_row_id) = &mut next_row_id { - Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?; - } - final_fragments.extend(new_fragments); - } - Operation::Delete { - ref updated_fragments, - ref deleted_fragment_ids, - .. - } => { - // Remove the deleted fragments - final_fragments.extend(maybe_existing_fragments?.clone()); - final_fragments.retain(|f| !deleted_fragment_ids.contains(&f.id)); - final_fragments.iter_mut().for_each(|f| { - for updated in updated_fragments { - if updated.id == f.id { - *f = updated.clone(); - } - } - }); - Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments) - } - Operation::Update { - removed_fragment_ids, - updated_fragments, - new_fragments, - } => { - final_fragments.extend(maybe_existing_fragments?.iter().filter_map(|f| { - if removed_fragment_ids.contains(&f.id) { - return None; - } - if let Some(updated) = updated_fragments.iter().find(|uf| uf.id == f.id) { - Some(updated.clone()) - } else { - Some(f.clone()) - } - })); - let mut new_fragments = - Self::fragments_with_ids(new_fragments.clone(), &mut fragment_id) - .collect::>(); - if let Some(next_row_id) = &mut next_row_id { - Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?; - } - final_fragments.extend(new_fragments); - Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments) - } - Operation::Overwrite { ref fragments, .. } => { - let mut new_fragments = - Self::fragments_with_ids(fragments.clone(), &mut fragment_id) - .collect::>(); - if let Some(next_row_id) = &mut next_row_id { - Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?; - } - final_fragments.extend(new_fragments); - final_indices = Vec::new(); - } - Operation::Rewrite { - ref groups, - ref rewritten_indices, - } => { - final_fragments.extend(maybe_existing_fragments?.clone()); - let current_version = current_manifest.map(|m| m.version).unwrap_or_default(); - Self::handle_rewrite_fragments( - &mut final_fragments, - groups, - &mut fragment_id, - current_version, - )?; - - if next_row_id.is_some() { - // We can re-use indices, but need to rewrite the fragment bitmaps - debug_assert!(rewritten_indices.is_empty()); - for index in final_indices.iter_mut() { - if let Some(fragment_bitmap) = &mut index.fragment_bitmap { - *fragment_bitmap = - Self::recalculate_fragment_bitmap(fragment_bitmap, groups)?; - } - } - } else { - Self::handle_rewrite_indices(&mut final_indices, rewritten_indices, groups)?; - } - } - Operation::CreateIndex { - new_indices, - removed_indices, - } => { - final_fragments.extend(maybe_existing_fragments?.clone()); - final_indices.retain(|existing_index| { - !new_indices - .iter() - .any(|new_index| new_index.name == existing_index.name) - && !removed_indices - .iter() - .any(|old_index| old_index.uuid == existing_index.uuid) - }); - final_indices.extend(new_indices.clone()); - } - Operation::ReserveFragments { .. } => { - final_fragments.extend(maybe_existing_fragments?.clone()); - } - Operation::Merge { ref fragments, .. } => { - final_fragments.extend(fragments.clone()); - - // Some fields that have indices may have been removed, so we should - // remove those indices as well. - Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments) - } - Operation::Project { .. } => { - final_fragments.extend(maybe_existing_fragments?.clone()); - - // We might have removed all fields for certain data files, so - // we should remove the data files that are no longer relevant. - let remaining_field_ids = schema - .fields_pre_order() - .map(|f| f.id) - .collect::>(); - for fragment in final_fragments.iter_mut() { - fragment.files.retain(|file| { - file.fields - .iter() - .any(|field_id| remaining_field_ids.contains(field_id)) - }); - } - - // Some fields that have indices may have been removed, so we should - // remove those indices as well. - Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments) - } - Operation::Restore { .. } => { - unreachable!() - } - Operation::UpdateConfig { .. } => {} - }; - - // If a fragment was reserved then it may not belong at the end of the fragments list. - final_fragments.sort_by_key(|frag| frag.id); - - let user_requested_version = match (&config.storage_format, config.use_legacy_format) { - (Some(storage_format), _) => Some(storage_format.lance_file_version()?), - (None, Some(true)) => Some(LanceFileVersion::Legacy), - (None, Some(false)) => Some(LanceFileVersion::V2_0), - (None, None) => None, - }; - - let mut manifest = if let Some(current_manifest) = current_manifest { - let mut prev_manifest = Manifest::new_from_previous( - current_manifest, - schema, - Arc::new(final_fragments), - new_blob_version, - ); - if user_requested_version.is_some() - && matches!(self.operation, Operation::Overwrite { .. }) - { - // If this is an overwrite operation and the user has requested a specific version - // then overwrite with that version. Otherwise, if the user didn't request a specific - // version, then overwrite with whatever version we had before. - prev_manifest.data_storage_format = - DataStorageFormat::new(user_requested_version.unwrap()); - } - prev_manifest - } else { - let data_storage_format = - Self::data_storage_format_from_files(&final_fragments, user_requested_version)?; - Manifest::new( - schema, - Arc::new(final_fragments), - data_storage_format, - new_blob_version, - ) - }; - - manifest.tag.clone_from(&self.tag); - - if config.auto_set_feature_flags { - apply_feature_flags(&mut manifest, config.use_move_stable_row_ids)?; - } - manifest.set_timestamp(timestamp_to_nanos(config.timestamp)); - - manifest.update_max_fragment_id(); - - match &self.operation { - Operation::Overwrite { - config_upsert_values: Some(tm), - .. - } => manifest.update_config(tm.clone()), - Operation::UpdateConfig { - upsert_values, - delete_keys, - } => { - // Delete is handled first. If the same key is referenced by upsert and - // delete, then upserted key-value pair will remain. - if let Some(delete_keys) = delete_keys { - manifest.delete_config_keys( - delete_keys - .iter() - .map(|s| s.as_str()) - .collect::>() - .as_slice(), - ) - } - if let Some(upsert_values) = upsert_values { - manifest.update_config(upsert_values.clone()); - } - } - _ => {} - } - - if let Operation::ReserveFragments { num_fragments } = self.operation { - manifest.max_fragment_id += num_fragments; - } - - manifest.transaction_file = Some(transaction_file_path.to_string()); - - if let Some(next_row_id) = next_row_id { - manifest.next_row_id = next_row_id; - } - - Ok((manifest, final_indices)) - } - - fn retain_relevant_indices(indices: &mut Vec, schema: &Schema, fragments: &[Fragment]) { - let field_ids = schema - .fields_pre_order() - .map(|f| f.id) - .collect::>(); - indices.retain(|existing_index| { - existing_index - .fields - .iter() - .all(|field_id| field_ids.contains(field_id)) - }); - - // We might have also removed all fragments that an index was covering, so - // we should remove those indices as well. - let fragment_ids = fragments.iter().map(|f| f.id).collect::>(); - indices.retain(|existing_index| { - existing_index - .fragment_bitmap - .as_ref() - .map(|bitmap| bitmap.iter().any(|id| fragment_ids.contains(&(id as u64)))) - .unwrap_or(true) - }); - } - - fn recalculate_fragment_bitmap( - old: &RoaringBitmap, - groups: &[RewriteGroup], - ) -> Result { - let mut new_bitmap = old.clone(); - for group in groups { - let any_in_index = group - .old_fragments - .iter() - .any(|frag| old.contains(frag.id as u32)); - let all_in_index = group - .old_fragments - .iter() - .all(|frag| old.contains(frag.id as u32)); - // Any rewrite group may or may not be covered by the index. However, if any fragment - // in a rewrite group was previously covered by the index then all fragments in the rewrite - // group must have been previously covered by the index. plan_compaction takes care of - // this for us so this should be safe to assume. - if any_in_index { - if all_in_index { - for frag_id in group.old_fragments.iter().map(|frag| frag.id as u32) { - new_bitmap.remove(frag_id); - } - new_bitmap.extend(group.new_fragments.iter().map(|frag| frag.id as u32)); - } else { - return Err(Error::invalid_input("The compaction plan included a rewrite group that was a split of indexed and non-indexed data", location!())); - } - } - } - Ok(new_bitmap) - } - - fn handle_rewrite_indices( - indices: &mut [Index], - rewritten_indices: &[RewrittenIndex], - groups: &[RewriteGroup], - ) -> Result<()> { - let mut modified_indices = HashSet::new(); - - for rewritten_index in rewritten_indices { - if !modified_indices.insert(rewritten_index.old_id) { - return Err(Error::invalid_input(format!("An invalid compaction plan must have been generated because multiple tasks modified the same index: {}", rewritten_index.old_id), location!())); - } - - let index = indices - .iter_mut() - .find(|idx| idx.uuid == rewritten_index.old_id) - .ok_or_else(|| { - Error::invalid_input( - format!( - "Invalid compaction plan refers to index {} which does not exist", - rewritten_index.old_id - ), - location!(), - ) - })?; - - index.fragment_bitmap = Some(Self::recalculate_fragment_bitmap( - index.fragment_bitmap.as_ref().ok_or_else(|| { - Error::invalid_input( - format!( - "Cannot rewrite index {} which did not store fragment bitmap", - index.uuid - ), - location!(), - ) - })?, - groups, - )?); - index.uuid = rewritten_index.new_id; - } - Ok(()) - } - - fn handle_rewrite_fragments( - final_fragments: &mut Vec, - groups: &[RewriteGroup], - fragment_id: &mut u64, - version: u64, - ) -> Result<()> { - for group in groups { - // If the old fragments are contiguous, find the range - let replace_range = { - let start = final_fragments.iter().enumerate().find(|(_, f)| f.id == group.old_fragments[0].id) - .ok_or_else(|| Error::CommitConflict { version, source: - format!("dataset does not contain a fragment a rewrite operation wants to replace: id={}", group.old_fragments[0].id).into() , location:location!()})?.0; - - // Verify old_fragments matches contiguous range - let mut i = 1; - loop { - if i == group.old_fragments.len() { - break Some(start..start + i); - } - if final_fragments[start + i].id != group.old_fragments[i].id { - break None; - } - i += 1; - } - }; - - let new_fragments = Self::fragments_with_ids(group.new_fragments.clone(), fragment_id); - if let Some(replace_range) = replace_range { - // Efficiently path using slice - final_fragments.splice(replace_range, new_fragments); - } else { - // Slower path for non-contiguous ranges - for fragment in group.old_fragments.iter() { - final_fragments.retain(|f| f.id != fragment.id); - } - final_fragments.extend(new_fragments); - } - } - Ok(()) - } - - fn assign_row_ids(next_row_id: &mut u64, fragments: &mut [Fragment]) -> Result<()> { - for fragment in fragments { - let physical_rows = fragment.physical_rows.ok_or_else(|| Error::Internal { - message: "Fragment does not have physical rows".into(), - location: location!(), - })? as u64; - let row_ids = *next_row_id..(*next_row_id + physical_rows); - let sequence = RowIdSequence::from(row_ids); - // TODO: write to a separate file if large. Possibly share a file with other fragments. - let serialized = write_row_ids(&sequence); - fragment.row_id_meta = Some(RowIdMeta::Inline(serialized)); - *next_row_id += physical_rows; - } - Ok(()) - } -} - -impl TryFrom for Transaction { - type Error = Error; - - fn try_from(message: pb::Transaction) -> Result { - let operation = match message.operation { - Some(pb::transaction::Operation::Append(pb::transaction::Append { fragments })) => { - Operation::Append { - fragments: fragments - .into_iter() - .map(Fragment::try_from) - .collect::>>()?, - } - } - Some(pb::transaction::Operation::Delete(pb::transaction::Delete { - updated_fragments, - deleted_fragment_ids, - predicate, - })) => Operation::Delete { - updated_fragments: updated_fragments - .into_iter() - .map(Fragment::try_from) - .collect::>>()?, - deleted_fragment_ids, - predicate, - }, - Some(pb::transaction::Operation::Overwrite(pb::transaction::Overwrite { - fragments, - schema, - schema_metadata: _schema_metadata, // TODO: handle metadata - config_upsert_values, - })) => { - let config_upsert_option = if config_upsert_values.is_empty() { - Some(config_upsert_values) - } else { - None - }; - - Operation::Overwrite { - fragments: fragments - .into_iter() - .map(Fragment::try_from) - .collect::>>()?, - schema: Schema::from(&Fields(schema)), - config_upsert_values: config_upsert_option, - } - } - Some(pb::transaction::Operation::ReserveFragments( - pb::transaction::ReserveFragments { num_fragments }, - )) => Operation::ReserveFragments { num_fragments }, - Some(pb::transaction::Operation::Rewrite(pb::transaction::Rewrite { - old_fragments, - new_fragments, - groups, - rewritten_indices, - })) => { - let groups = if !groups.is_empty() { - groups - .into_iter() - .map(RewriteGroup::try_from) - .collect::>()? - } else { - vec![RewriteGroup { - old_fragments: old_fragments - .into_iter() - .map(Fragment::try_from) - .collect::>>()?, - new_fragments: new_fragments - .into_iter() - .map(Fragment::try_from) - .collect::>>()?, - }] - }; - let rewritten_indices = rewritten_indices - .iter() - .map(RewrittenIndex::try_from) - .collect::>()?; - - Operation::Rewrite { - groups, - rewritten_indices, - } - } - Some(pb::transaction::Operation::CreateIndex(pb::transaction::CreateIndex { - new_indices, - removed_indices, - })) => Operation::CreateIndex { - new_indices: new_indices - .into_iter() - .map(Index::try_from) - .collect::>()?, - removed_indices: removed_indices - .into_iter() - .map(Index::try_from) - .collect::>()?, - }, - Some(pb::transaction::Operation::Merge(pb::transaction::Merge { - fragments, - schema, - schema_metadata: _schema_metadata, // TODO: handle metadata - })) => Operation::Merge { - fragments: fragments - .into_iter() - .map(Fragment::try_from) - .collect::>>()?, - schema: Schema::from(&Fields(schema)), - }, - Some(pb::transaction::Operation::Restore(pb::transaction::Restore { version })) => { - Operation::Restore { version } - } - Some(pb::transaction::Operation::Update(pb::transaction::Update { - removed_fragment_ids, - updated_fragments, - new_fragments, - })) => Operation::Update { - removed_fragment_ids, - updated_fragments: updated_fragments - .into_iter() - .map(Fragment::try_from) - .collect::>>()?, - new_fragments: new_fragments - .into_iter() - .map(Fragment::try_from) - .collect::>>()?, - }, - Some(pb::transaction::Operation::Project(pb::transaction::Project { schema })) => { - Operation::Project { - schema: Schema::from(&Fields(schema)), - } - } - Some(pb::transaction::Operation::UpdateConfig(pb::transaction::UpdateConfig { - upsert_values, - delete_keys, - })) => { - let upsert_values = match upsert_values.len() { - 0 => None, - _ => Some(upsert_values), - }; - let delete_keys = match delete_keys.len() { - 0 => None, - _ => Some(delete_keys), - }; - Operation::UpdateConfig { - upsert_values, - delete_keys, - } - } - None => { - return Err(Error::Internal { - message: "Transaction message did not contain an operation".to_string(), - location: location!(), - }); - } - }; - let blobs_op = message - .blob_operation - .map(|blob_op| match blob_op { - pb::transaction::BlobOperation::BlobAppend(pb::transaction::Append { - fragments, - }) => Result::Ok(Operation::Append { - fragments: fragments - .into_iter() - .map(Fragment::try_from) - .collect::>>()?, - }), - pb::transaction::BlobOperation::BlobOverwrite(pb::transaction::Overwrite { - fragments, - schema, - schema_metadata: _schema_metadata, // TODO: handle metadata - config_upsert_values, - }) => { - let config_upsert_option = if config_upsert_values.is_empty() { - Some(config_upsert_values) - } else { - None - }; - - Ok(Operation::Overwrite { - fragments: fragments - .into_iter() - .map(Fragment::try_from) - .collect::>>()?, - schema: Schema::from(&Fields(schema)), - config_upsert_values: config_upsert_option, - }) - } - }) - .transpose()?; - Ok(Self { - read_version: message.read_version, - uuid: message.uuid.clone(), - operation, - blobs_op, - tag: if message.tag.is_empty() { - None - } else { - Some(message.tag.clone()) - }, - }) - } -} - -impl TryFrom<&pb::transaction::rewrite::RewrittenIndex> for RewrittenIndex { - type Error = Error; - - fn try_from(message: &pb::transaction::rewrite::RewrittenIndex) -> Result { - Ok(Self { - old_id: message - .old_id - .as_ref() - .map(Uuid::try_from) - .ok_or_else(|| { - Error::io( - "required field (old_id) missing from message".to_string(), - location!(), - ) - })??, - new_id: message - .new_id - .as_ref() - .map(Uuid::try_from) - .ok_or_else(|| { - Error::io( - "required field (new_id) missing from message".to_string(), - location!(), - ) - })??, - }) - } -} - -impl TryFrom for RewriteGroup { - type Error = Error; - - fn try_from(message: pb::transaction::rewrite::RewriteGroup) -> Result { - Ok(Self { - old_fragments: message - .old_fragments - .into_iter() - .map(Fragment::try_from) - .collect::>>()?, - new_fragments: message - .new_fragments - .into_iter() - .map(Fragment::try_from) - .collect::>>()?, - }) - } -} - -impl From<&Transaction> for pb::Transaction { - fn from(value: &Transaction) -> Self { - let operation = match &value.operation { - Operation::Append { fragments } => { - pb::transaction::Operation::Append(pb::transaction::Append { - fragments: fragments.iter().map(pb::DataFragment::from).collect(), - }) - } - Operation::Delete { - updated_fragments, - deleted_fragment_ids, - predicate, - } => pb::transaction::Operation::Delete(pb::transaction::Delete { - updated_fragments: updated_fragments - .iter() - .map(pb::DataFragment::from) - .collect(), - deleted_fragment_ids: deleted_fragment_ids.clone(), - predicate: predicate.clone(), - }), - Operation::Overwrite { - fragments, - schema, - config_upsert_values, - } => { - pb::transaction::Operation::Overwrite(pb::transaction::Overwrite { - fragments: fragments.iter().map(pb::DataFragment::from).collect(), - schema: Fields::from(schema).0, - schema_metadata: Default::default(), // TODO: handle metadata - config_upsert_values: config_upsert_values - .clone() - .unwrap_or(Default::default()), - }) - } - Operation::ReserveFragments { num_fragments } => { - pb::transaction::Operation::ReserveFragments(pb::transaction::ReserveFragments { - num_fragments: *num_fragments, - }) - } - Operation::Rewrite { - groups, - rewritten_indices, - } => pb::transaction::Operation::Rewrite(pb::transaction::Rewrite { - groups: groups - .iter() - .map(pb::transaction::rewrite::RewriteGroup::from) - .collect(), - rewritten_indices: rewritten_indices - .iter() - .map(|rewritten| rewritten.into()) - .collect(), - ..Default::default() - }), - Operation::CreateIndex { - new_indices, - removed_indices, - } => pb::transaction::Operation::CreateIndex(pb::transaction::CreateIndex { - new_indices: new_indices.iter().map(IndexMetadata::from).collect(), - removed_indices: removed_indices.iter().map(IndexMetadata::from).collect(), - }), - Operation::Merge { fragments, schema } => { - pb::transaction::Operation::Merge(pb::transaction::Merge { - fragments: fragments.iter().map(pb::DataFragment::from).collect(), - schema: Fields::from(schema).0, - schema_metadata: Default::default(), // TODO: handle metadata - }) - } - Operation::Restore { version } => { - pb::transaction::Operation::Restore(pb::transaction::Restore { version: *version }) - } - Operation::Update { - removed_fragment_ids, - updated_fragments, - new_fragments, - } => pb::transaction::Operation::Update(pb::transaction::Update { - removed_fragment_ids: removed_fragment_ids.clone(), - updated_fragments: updated_fragments - .iter() - .map(pb::DataFragment::from) - .collect(), - new_fragments: new_fragments.iter().map(pb::DataFragment::from).collect(), - }), - Operation::Project { schema } => { - pb::transaction::Operation::Project(pb::transaction::Project { - schema: Fields::from(schema).0, - }) - } - Operation::UpdateConfig { - upsert_values, - delete_keys, - } => pb::transaction::Operation::UpdateConfig(pb::transaction::UpdateConfig { - upsert_values: upsert_values.clone().unwrap_or(Default::default()), - delete_keys: delete_keys.clone().unwrap_or(Default::default()), - }), - }; - - let blob_operation = value.blobs_op.as_ref().map(|op| match op { - Operation::Append { fragments } => { - pb::transaction::BlobOperation::BlobAppend(pb::transaction::Append { - fragments: fragments.iter().map(pb::DataFragment::from).collect(), - }) - } - Operation::Overwrite { - fragments, - schema, - config_upsert_values, - } => { - pb::transaction::BlobOperation::BlobOverwrite(pb::transaction::Overwrite { - fragments: fragments.iter().map(pb::DataFragment::from).collect(), - schema: Fields::from(schema).0, - schema_metadata: Default::default(), // TODO: handle metadata - config_upsert_values: config_upsert_values - .clone() - .unwrap_or(Default::default()), - }) - } - _ => panic!("Invalid blob operation: {:?}", value), - }); - - Self { - read_version: value.read_version, - uuid: value.uuid.clone(), - operation: Some(operation), - blob_operation, - tag: value.tag.clone().unwrap_or("".to_string()), - } - } -} - -impl From<&RewrittenIndex> for pb::transaction::rewrite::RewrittenIndex { - fn from(value: &RewrittenIndex) -> Self { - Self { - old_id: Some((&value.old_id).into()), - new_id: Some((&value.new_id).into()), - } - } -} - -impl From<&RewriteGroup> for pb::transaction::rewrite::RewriteGroup { - fn from(value: &RewriteGroup) -> Self { - Self { - old_fragments: value - .old_fragments - .iter() - .map(pb::DataFragment::from) - .collect(), - new_fragments: value - .new_fragments - .iter() - .map(pb::DataFragment::from) - .collect(), - } - } -} - -/// Validate the operation is valid for the given manifest. -pub fn validate_operation(manifest: Option<&Manifest>, operation: &Operation) -> Result<()> { - let manifest = match (manifest, operation) { - ( - None, - Operation::Overwrite { - fragments, - schema, - config_upsert_values: None, - }, - ) => { - // Validate here because we are going to return early. - schema_fragments_valid(schema, fragments)?; - - return Ok(()); - } - (Some(manifest), _) => manifest, - (None, _) => { - return Err(Error::invalid_input( - format!( - "Cannot apply operation {} to non-existent dataset", - operation.name() - ), - location!(), - )); - } - }; - - match operation { - Operation::Append { fragments } => { - // Fragments must contain all fields in the schema - schema_fragments_valid(&manifest.schema, fragments) - } - Operation::Project { schema } => { - schema_fragments_valid(schema, manifest.fragments.as_ref()) - } - Operation::Merge { fragments, schema } - | Operation::Overwrite { - fragments, - schema, - config_upsert_values: None, - } => schema_fragments_valid(schema, fragments), - Operation::Update { - updated_fragments, - new_fragments, - .. - } => { - schema_fragments_valid(&manifest.schema, updated_fragments)?; - schema_fragments_valid(&manifest.schema, new_fragments) - } - _ => Ok(()), - } -} - -/// Check that each fragment contains all fields in the schema. -/// It is not required that the schema contains all fields in the fragment. -/// There may be masked fields. -fn schema_fragments_valid(schema: &Schema, fragments: &[Fragment]) -> Result<()> { - // TODO: add additional validation. Consider consolidating with various - // validate() methods in the codebase. - for fragment in fragments { - for field in schema.fields_pre_order() { - if !fragment - .files - .iter() - .flat_map(|f| f.fields.iter()) - .any(|f_id| f_id == &field.id) - { - return Err(Error::invalid_input( - format!( - "Fragment {} does not contain field {:?}", - fragment.id, field - ), - location!(), - )); - } - } - } - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_conflicts() { - let index0 = Index { - uuid: uuid::Uuid::new_v4(), - name: "test".to_string(), - fields: vec![0], - dataset_version: 1, - fragment_bitmap: None, - index_details: None, - }; - let fragment0 = Fragment::new(0); - let fragment1 = Fragment::new(1); - let fragment2 = Fragment::new(2); - // The transactions that will be checked against - let other_operations = [ - Operation::Append { - fragments: vec![fragment0.clone()], - }, - Operation::CreateIndex { - new_indices: vec![index0.clone()], - removed_indices: vec![index0.clone()], - }, - Operation::Delete { - updated_fragments: vec![fragment0.clone()], - deleted_fragment_ids: vec![2], - predicate: "x > 2".to_string(), - }, - Operation::Merge { - fragments: vec![fragment0.clone(), fragment2.clone()], - schema: Schema::default(), - }, - Operation::Overwrite { - fragments: vec![fragment0.clone(), fragment2.clone()], - schema: Schema::default(), - config_upsert_values: Some(HashMap::from_iter(vec![( - "overwrite-key".to_string(), - "value".to_string(), - )])), - }, - Operation::Rewrite { - groups: vec![RewriteGroup { - old_fragments: vec![fragment0.clone()], - new_fragments: vec![fragment1.clone()], - }], - rewritten_indices: vec![], - }, - Operation::ReserveFragments { num_fragments: 3 }, - Operation::Update { - removed_fragment_ids: vec![1], - updated_fragments: vec![fragment0.clone()], - new_fragments: vec![fragment2.clone()], - }, - Operation::UpdateConfig { - upsert_values: Some(HashMap::from_iter(vec![( - "lance.test".to_string(), - "value".to_string(), - )])), - delete_keys: Some(vec!["remove-key".to_string()]), - }, - ]; - let other_transactions = other_operations - .iter() - .map(|op| Transaction::new(0, op.clone(), None, None)) - .collect::>(); - - // Transactions and whether they are expected to conflict with each - // of other_transactions - let cases = [ - ( - Operation::Append { - fragments: vec![fragment0.clone()], - }, - [false, false, false, true, true, false, false, false, false], - ), - ( - Operation::Delete { - // Delete that affects fragments different from other transactions - updated_fragments: vec![fragment1.clone()], - deleted_fragment_ids: vec![], - predicate: "x > 2".to_string(), - }, - [false, false, false, true, true, false, false, true, false], - ), - ( - Operation::Delete { - // Delete that affects same fragments as other transactions - updated_fragments: vec![fragment0.clone(), fragment2.clone()], - deleted_fragment_ids: vec![], - predicate: "x > 2".to_string(), - }, - [false, false, true, true, true, true, false, true, false], - ), - ( - Operation::Overwrite { - fragments: vec![fragment0.clone(), fragment2.clone()], - schema: Schema::default(), - config_upsert_values: None, - }, - // No conflicts: overwrite can always happen since it doesn't - // depend on previous state of the table. - [ - false, false, false, false, false, false, false, false, false, - ], - ), - ( - Operation::CreateIndex { - new_indices: vec![index0.clone()], - removed_indices: vec![index0], - }, - // Will only conflict with operations that modify row ids. - [false, false, false, false, true, true, false, false, false], - ), - ( - // Rewrite that affects different fragments - Operation::Rewrite { - groups: vec![RewriteGroup { - old_fragments: vec![fragment1], - new_fragments: vec![fragment0.clone()], - }], - rewritten_indices: Vec::new(), - }, - [false, true, false, true, true, false, false, true, false], - ), - ( - // Rewrite that affects the same fragments - Operation::Rewrite { - groups: vec![RewriteGroup { - old_fragments: vec![fragment0.clone(), fragment2.clone()], - new_fragments: vec![fragment0.clone()], - }], - rewritten_indices: Vec::new(), - }, - [false, true, true, true, true, true, false, true, false], - ), - ( - Operation::Merge { - fragments: vec![fragment0.clone(), fragment2.clone()], - schema: Schema::default(), - }, - // Merge conflicts with everything except CreateIndex and ReserveFragments. - [true, false, true, true, true, true, false, true, false], - ), - ( - Operation::ReserveFragments { num_fragments: 2 }, - // ReserveFragments only conflicts with Overwrite and Restore. - [false, false, false, false, true, false, false, false, false], - ), - ( - Operation::Update { - // Update that affects same fragments as other transactions - updated_fragments: vec![fragment0], - removed_fragment_ids: vec![], - new_fragments: vec![fragment2], - }, - [false, false, true, true, true, true, false, true, false], - ), - ( - // Update config that should not conflict with anything - Operation::UpdateConfig { - upsert_values: Some(HashMap::from_iter(vec![( - "other-key".to_string(), - "new-value".to_string(), - )])), - delete_keys: None, - }, - [ - false, false, false, false, false, false, false, false, false, - ], - ), - ( - // Update config that conflicts with key being upserted by other UpdateConfig operation - Operation::UpdateConfig { - upsert_values: Some(HashMap::from_iter(vec![( - "lance.test".to_string(), - "new-value".to_string(), - )])), - delete_keys: None, - }, - [false, false, false, false, false, false, false, false, true], - ), - ( - // Update config that conflicts with key being deleted by other UpdateConfig operation - Operation::UpdateConfig { - upsert_values: Some(HashMap::from_iter(vec![( - "remove-key".to_string(), - "new-value".to_string(), - )])), - delete_keys: None, - }, - [false, false, false, false, false, false, false, false, true], - ), - ( - // Delete config keys currently being deleted by other UpdateConfig operation - Operation::UpdateConfig { - upsert_values: None, - delete_keys: Some(vec!["remove-key".to_string()]), - }, - [ - false, false, false, false, false, false, false, false, false, - ], - ), - ( - // Delete config keys currently being upserted by other UpdateConfig operation - Operation::UpdateConfig { - upsert_values: None, - delete_keys: Some(vec!["lance.test".to_string()]), - }, - [false, false, false, false, false, false, false, false, true], - ), - ]; - - for (operation, expected_conflicts) in &cases { - let transaction = Transaction::new(0, operation.clone(), None, None); - for (other, expected_conflict) in other_transactions.iter().zip(expected_conflicts) { - assert_eq!( - transaction.conflicts_with(other), - *expected_conflict, - "Transaction {:?} should {} with {:?}", - transaction, - if *expected_conflict { - "conflict" - } else { - "not conflict" - }, - other - ); - } - } - } - - #[test] - fn test_rewrite_fragments() { - let existing_fragments: Vec = (0..10).map(Fragment::new).collect(); - - let mut final_fragments = existing_fragments; - let rewrite_groups = vec![ - // Since these are contiguous, they will be put in the same location - // as 1 and 2. - RewriteGroup { - old_fragments: vec![Fragment::new(1), Fragment::new(2)], - // These two fragments were previously reserved - new_fragments: vec![Fragment::new(15), Fragment::new(16)], - }, - // These are not contiguous, so they will be inserted at the end. - RewriteGroup { - old_fragments: vec![Fragment::new(5), Fragment::new(8)], - // We pretend this id was not reserved. Does not happen in practice today - // but we want to leave the door open. - new_fragments: vec![Fragment::new(0)], - }, - ]; - - let mut fragment_id = 20; - let version = 0; - - Transaction::handle_rewrite_fragments( - &mut final_fragments, - &rewrite_groups, - &mut fragment_id, - version, - ) - .unwrap(); - - assert_eq!(fragment_id, 21); - - let expected_fragments: Vec = vec![ - Fragment::new(0), - Fragment::new(15), - Fragment::new(16), - Fragment::new(3), - Fragment::new(4), - Fragment::new(6), - Fragment::new(7), - Fragment::new(9), - Fragment::new(20), - ]; - - assert_eq!(final_fragments, expected_fragments); - } -} +pub use lance_table::format::transaction::{Operation, RewriteGroup, RewrittenIndex, Transaction}; diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 4b7fb4cea7..9e49e268db 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -13,16 +13,17 @@ use snafu::{location, Location}; use crate::{ dataset::{ - builder::DatasetBuilder, - commit_detached_transaction, commit_new_dataset, commit_transaction, - refs::Tags, - transaction::{Operation, Transaction}, - ManifestWriteConfig, ReadParams, + builder::DatasetBuilder, commit_detached_transaction, commit_new_dataset, + commit_transaction, refs::Tags, ReadParams, }, session::Session, Dataset, Error, Result, }; +use lance_table::format::transaction::{ + v1::Transaction as V1Transaction, ManifestWriteConfig, Operation, Transaction, +}; + use super::{resolve_commit_handler, WriteDestination}; /// Create a new commit from a [`Transaction`]. @@ -211,8 +212,8 @@ impl<'a> CommitBuilder<'a> { // If we are using a detached version, we need to load the dataset. // Otherwise, we are writing to the main history, and need to check // out the latest version. - if is_detached_version(transaction.read_version) { - builder = builder.with_version(transaction.read_version) + if is_detached_version(transaction.read_version()) { + builder = builder.with_version(transaction.read_version()) } match builder.load().await { @@ -225,8 +226,15 @@ impl<'a> CommitBuilder<'a> { } }; - if dest.dataset().is_none() && !matches!(transaction.operation, Operation::Overwrite { .. }) - { + let is_overwrite = matches!( + transaction, + Transaction::V1(V1Transaction { + operation: Operation::Overwrite { .. }, + .. + }) + ); + + if dest.dataset().is_none() && !is_overwrite { return Err(Error::DatasetNotFound { path: base_path.to_string(), source: "The dataset must already exist unless the operation is Overwrite".into(), @@ -252,9 +260,7 @@ impl<'a> CommitBuilder<'a> { if let Some(ds) = dest.dataset() { if let Some(storage_format) = self.storage_format { let passed_storage_format = DataStorageFormat::new(storage_format); - if ds.manifest.data_storage_format != passed_storage_format - && !matches!(transaction.operation, Operation::Overwrite { .. }) - { + if ds.manifest.data_storage_format != passed_storage_format && !is_overwrite { return Err(Error::InvalidInput { source: format!( "Storage format mismatch. Existing dataset uses {:?}, but new data uses {:?}", @@ -361,6 +367,18 @@ impl<'a> CommitBuilder<'a> { location: location!(), }); } + + // TODO: support V2 transactions + let transactions = transactions + .into_iter() + .map(|t| match t { + Transaction::V1(t) => Ok(t), + _ => Err(Error::NotSupported { + source: "Only v1 transactions are supported in batch commits".into(), + location: location!(), + }), + }) + .collect::>>()?; if transactions .iter() .any(|t| !matches!(t.operation, Operation::Append { .. })) @@ -388,21 +406,16 @@ impl<'a> CommitBuilder<'a> { }) }; - let merged = Transaction { - uuid: uuid::Uuid::new_v4().hyphenated().to_string(), - operation: Operation::Append { - fragments: transactions - .iter() - .flat_map(|t| match &t.operation { - Operation::Append { fragments } => fragments.clone(), - _ => unreachable!(), - }) - .collect(), - }, - read_version, - blobs_op, - tag: None, + let operation = Operation::Append { + fragments: transactions + .iter() + .flat_map(|t| match &t.operation { + Operation::Append { fragments } => fragments.clone(), + _ => unreachable!(), + }) + .collect(), }; + let merged = Transaction::new_v1(read_version, operation, blobs_op); let dataset = self.execute(merged.clone()).await?; Ok(BatchCommitResult { dataset, merged }) } @@ -448,15 +461,13 @@ mod tests { } fn sample_transaction(read_version: u64) -> Transaction { - Transaction { - uuid: uuid::Uuid::new_v4().hyphenated().to_string(), - operation: Operation::Append { + Transaction::new_v1( + read_version, + Operation::Append { fragments: vec![sample_fragment()], }, - read_version, - blobs_op: None, - tag: None, - } + None, + ) } #[tokio::test] @@ -586,17 +597,15 @@ mod tests { assert!(matches!(res, Err(Error::InvalidInput { .. }))); // Attempting to commit update gives error - let update_transaction = Transaction { - uuid: uuid::Uuid::new_v4().hyphenated().to_string(), - operation: Operation::Update { + let update_transaction = Transaction::new_v1( + 1, // read version + Operation::Update { updated_fragments: vec![], new_fragments: vec![], removed_fragment_ids: vec![], }, - read_version: 1, - blobs_op: None, - tag: None, - }; + None, // no blobs + ); let res = CommitBuilder::new(dataset.clone()) .execute_batch(vec![update_transaction]) .await; @@ -606,10 +615,10 @@ mod tests { let append1 = sample_transaction(1); let append2 = sample_transaction(2); let mut expected_fragments = vec![]; - if let Operation::Append { fragments } = &append1.operation { + if let Operation::Append { fragments } = &append1.operation().unwrap() { expected_fragments.extend(fragments.clone()); } - if let Operation::Append { fragments } = &append2.operation { + if let Operation::Append { fragments } = &append2.operation().unwrap() { expected_fragments.extend(fragments.clone()); } let res = CommitBuilder::new(dataset.clone()) @@ -618,9 +627,9 @@ mod tests { .unwrap(); let transaction = res.merged; assert!( - matches!(transaction.operation, Operation::Append { fragments } if fragments == expected_fragments) + matches!(transaction.operation().unwrap(), Operation::Append { fragments } if fragments == &expected_fragments) ); - assert_eq!(transaction.read_version, 1); - assert!(transaction.blobs_op.is_none()); + assert_eq!(transaction.read_version(), 1); + assert!(transaction.blob_transaction(0).is_none()); } } diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index 89bc008e28..2e674dc552 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -223,7 +223,7 @@ impl<'a> InsertBuilder<'a> { WriteMode::Append => Operation::Append { fragments: blob.0 }, }); - Ok(Transaction::new( + Ok(Transaction::new_v1( context .dest .dataset() @@ -231,7 +231,6 @@ impl<'a> InsertBuilder<'a> { .unwrap_or(0), operation, blobs_op, - None, )) } diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 16af301c8d..0e72069a4d 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -992,12 +992,8 @@ impl MergeInsertJob { updated_fragments, new_fragments, }; - let transaction = Transaction::new( - dataset.manifest.version, - operation, - /*blobs_op=*/ None, - None, - ); + let transaction = + Transaction::new_v1(dataset.manifest.version, operation, /*blobs_op=*/ None); let (manifest, manifest_path) = commit_transaction( dataset.as_ref(), diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index b15a328967..09e54dde78 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -366,11 +366,10 @@ impl UpdateJob { updated_fragments, new_fragments, }; - let transaction = Transaction::new( + let transaction = Transaction::new_v1( self.dataset.manifest.version, operation, /*blobs_op=*/ None, - None, ); let (manifest, manifest_path) = commit_transaction( diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index ace9906d5c..b50287ae30 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -318,14 +318,13 @@ impl DatasetIndexExt for Dataset { fragment_bitmap: Some(self.get_fragments().iter().map(|f| f.id() as u32).collect()), index_details: Some(index_details), }; - let transaction = Transaction::new( + let transaction = Transaction::new_v1( self.manifest.version, Operation::CreateIndex { new_indices: vec![new_idx], removed_indices: vec![], }, /*blobs_op= */ None, - None, ); let (new_manifest, manifest_path) = commit_transaction( @@ -394,14 +393,13 @@ impl DatasetIndexExt for Dataset { index_details: None, }; - let transaction = Transaction::new( + let transaction = Transaction::new_v1( self.manifest.version, Operation::CreateIndex { new_indices: vec![new_idx], removed_indices: vec![], }, /*blobs_op= */ None, - None, ); let (new_manifest, new_path) = commit_transaction( @@ -493,14 +491,13 @@ impl DatasetIndexExt for Dataset { return Ok(()); } - let transaction = Transaction::new( + let transaction = Transaction::new_v1( self.manifest.version, Operation::CreateIndex { new_indices, removed_indices, }, /*blobs_op= */ None, - None, ); let (new_manifest, manifest_path) = commit_transaction( diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index e8e77e4b41..ed39f3fbaf 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -27,8 +27,10 @@ use std::sync::Arc; use lance_file::version::LanceFileVersion; use lance_table::format::{ - is_detached_version, pb, DataStorageFormat, DeletionFile, Fragment, Index, Manifest, - WriterVersion, DETACHED_VERSION_MASK, + is_detached_version, pb, + transaction::{v1::Transaction as V1Transaction, ManifestWriteConfig, Operation, Transaction}, + DataStorageFormat, DeletionFile, Fragment, Index, Manifest, WriterVersion, + DETACHED_VERSION_MASK, }; use lance_table::io::commit::{CommitConfig, CommitError, CommitHandler, ManifestNamingScheme}; use lance_table::io::deletion::read_deletion_file; @@ -44,8 +46,7 @@ use prost::Message; use super::ObjectStore; use crate::dataset::fragment::FileFragment; -use crate::dataset::transaction::{Operation, Transaction}; -use crate::dataset::{write_manifest_file, ManifestWriteConfig, BLOB_DIR}; +use crate::dataset::{write_manifest_file, BLOB_DIR}; use crate::index::DatasetIndexInternalExt; use crate::session::Session; use crate::Dataset; @@ -110,7 +111,7 @@ async fn write_transaction_file( base_path: &Path, transaction: &Transaction, ) -> Result { - let file_name = format!("{}-{}.txn", transaction.read_version, transaction.uuid); + let file_name = format!("{}-{}.txn", transaction.read_version(), transaction.uuid()); let path = base_path.child("_transactions").child(file_name.as_str()); let message = pb::Transaction::from(transaction); @@ -213,9 +214,8 @@ pub(crate) async fn commit_new_dataset( manifest_naming_scheme: ManifestNamingScheme, session: &Session, ) -> Result<(Manifest, Path)> { - let blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() { + let blob_version = if let Some(blob_tx) = transaction.blob_transaction(0) { let blob_path = base_path.child(BLOB_DIR); - let blob_tx = Transaction::new(0, blob_op.clone(), None, None); let (blob_manifest, _) = do_commit_new_dataset( object_store, commit_handler, @@ -535,25 +535,28 @@ pub(crate) async fn do_commit_detached_transaction( // Pick a random u64 with the highest bit set to indicate it is detached let random_version = thread_rng().gen::() | DETACHED_VERSION_MASK; - let (mut manifest, mut indices) = match transaction.operation { - Operation::Restore { version } => { - Transaction::restore_old_manifest( - object_store, - commit_handler, - &dataset.base, - version, - write_config, - &transaction_file, - ) - .await? - } - _ => transaction.build_manifest( + let (mut manifest, mut indices) = if let Transaction::V1(V1Transaction { + operation: Operation::Restore { version }, + .. + }) = transaction + { + Transaction::restore_old_manifest( + object_store, + commit_handler, + &dataset.base, + *version, + write_config, + &transaction_file, + ) + .await? + } else { + transaction.build_manifest( Some(dataset.manifest.as_ref()), dataset.load_indices().await?.as_ref().clone(), &transaction_file, write_config, new_blob_version, - )?, + )? }; manifest.version = random_version; @@ -621,21 +624,22 @@ pub(crate) async fn commit_detached_transaction( write_config: &ManifestWriteConfig, commit_config: &CommitConfig, ) -> Result<(Manifest, Path)> { - let new_blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() { - let blobs_dataset = dataset.blobs_dataset().await?.unwrap(); - let blobs_tx = - Transaction::new(blobs_dataset.version().version, blob_op.clone(), None, None); - let (blobs_manifest, _) = do_commit_detached_transaction( - blobs_dataset.as_ref(), - object_store, - commit_handler, - &blobs_tx, - write_config, - commit_config, - None, - ) - .await?; - Some(blobs_manifest.version) + let new_blob_version = if let Some(blob_ds) = dataset.blobs_dataset().await? { + if let Some(blobs_tx) = transaction.blob_transaction(blob_ds.version().version) { + let (blobs_manifest, _) = do_commit_detached_transaction( + blob_ds.as_ref(), + object_store, + commit_handler, + &blobs_tx, + write_config, + commit_config, + None, + ) + .await?; + Some(blobs_manifest.version) + } else { + None + } } else { None }; @@ -662,21 +666,22 @@ pub(crate) async fn commit_transaction( commit_config: &CommitConfig, manifest_naming_scheme: ManifestNamingScheme, ) -> Result<(Manifest, Path)> { - let new_blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() { - let blobs_dataset = dataset.blobs_dataset().await?.unwrap(); - let blobs_tx = - Transaction::new(blobs_dataset.version().version, blob_op.clone(), None, None); - let (blobs_manifest, _) = do_commit_detached_transaction( - blobs_dataset.as_ref(), - object_store, - commit_handler, - &blobs_tx, - write_config, - commit_config, - None, - ) - .await?; - Some(blobs_manifest.version) + let new_blob_version = if let Some(blob_ds) = dataset.blobs_dataset().await? { + if let Some(blobs_tx) = transaction.blob_transaction(blob_ds.version().version) { + let (blobs_manifest, _) = do_commit_detached_transaction( + blob_ds.as_ref(), + object_store, + commit_handler, + &blobs_tx, + write_config, + commit_config, + None, + ) + .await?; + Some(blobs_manifest.version) + } else { + None + } } else { None }; @@ -686,7 +691,7 @@ pub(crate) async fn commit_transaction( let transaction_file = write_transaction_file(object_store, &dataset.base, transaction).await?; // First, get all transactions since read_version - let read_version = transaction.read_version; + let read_version = transaction.read_version(); let mut dataset = dataset.clone(); // We need to checkout the latest version, because any fixes we apply // (like computing the new row ids) needs to be done based on the most @@ -725,25 +730,28 @@ pub(crate) async fn commit_transaction( for attempt_i in 0..commit_config.num_retries { // Build an up-to-date manifest from the transaction and current manifest - let (mut manifest, mut indices) = match transaction.operation { - Operation::Restore { version } => { - Transaction::restore_old_manifest( - object_store, - commit_handler, - &dataset.base, - version, - write_config, - &transaction_file, - ) - .await? - } - _ => transaction.build_manifest( + let (mut manifest, mut indices) = if let Transaction::V1(V1Transaction { + operation: Operation::Restore { version }, + .. + }) = transaction + { + Transaction::restore_old_manifest( + object_store, + commit_handler, + &dataset.base, + *version, + write_config, + &transaction_file, + ) + .await? + } else { + transaction.build_manifest( Some(dataset.manifest.as_ref()), dataset.load_indices().await?.as_ref().clone(), &transaction_file, write_config, new_blob_version, - )?, + )? }; manifest.version = target_version; @@ -1000,11 +1008,10 @@ mod tests { async fn test_roundtrip_transaction_file() { let object_store = ObjectStore::memory(); let base_path = Path::from("test"); - let transaction = Transaction::new( + let transaction = Transaction::new_v1( 42, Operation::Append { fragments: vec![] }, /*blobs_op= */ None, - Some("hello world".to_string()), ); let file_name = write_transaction_file(&object_store, &base_path, &transaction) @@ -1014,13 +1021,15 @@ mod tests { .await .unwrap(); - assert_eq!(transaction.read_version, read_transaction.read_version); - assert_eq!(transaction.uuid, read_transaction.uuid); + assert_eq!(transaction.read_version(), read_transaction.read_version()); + assert_eq!(transaction.uuid(), read_transaction.uuid()); assert!(matches!( - read_transaction.operation, - Operation::Append { .. } + read_transaction, + Transaction::V1(V1Transaction { + operation: Operation::Append { .. }, + .. + }), )); - assert_eq!(transaction.tag, read_transaction.tag); } #[tokio::test] diff --git a/rust/lance/src/utils/temporal.rs b/rust/lance/src/utils/temporal.rs index a518e90a1f..a597a2396e 100644 --- a/rust/lance/src/utils/temporal.rs +++ b/rust/lance/src/utils/temporal.rs @@ -25,11 +25,3 @@ pub fn utc_now() -> DateTime { .naive_utc(); Utc.from_utc_datetime(&naive) } - -pub fn timestamp_to_nanos(timestamp: Option) -> u128 { - let timestamp = timestamp.unwrap_or_else(SystemTime::now); - timestamp - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_nanos() -}