diff --git a/python/python/lance/__init__.py b/python/python/lance/__init__.py index d16f2f30c3..f900a26f6c 100644 --- a/python/python/lance/__init__.py +++ b/python/python/lance/__init__.py @@ -11,6 +11,7 @@ LanceOperation, LanceScanner, MergeInsertBuilder, + Transaction, __version__, batch_udf, write_dataset, @@ -38,6 +39,7 @@ "LanceOperation", "LanceScanner", "MergeInsertBuilder", + "Transaction", "__version__", "write_dataset", "schema_to_json", diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index ae0593a6e4..836477c2ce 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -4,11 +4,13 @@ from __future__ import annotations import copy +import dataclasses import json import logging import os import random import time +import uuid import warnings from abc import ABC, abstractmethod from dataclasses import dataclass @@ -23,6 +25,7 @@ List, Literal, Optional, + Sequence, Set, TypedDict, Union, @@ -2171,7 +2174,6 @@ def commit( 2 3 c 3 4 d """ - # TODO: mode is never used! if isinstance(base_uri, Path): base_uri = str(base_uri) elif isinstance(base_uri, LanceDataset): @@ -2211,6 +2213,112 @@ def commit( ds._default_scan_options = None return ds + @staticmethod + def commit_batch( + dest: Union[str, Path, LanceDataset], + transactions: Sequence[Transaction], + commit_lock: Optional[CommitLock] = None, + storage_options: Optional[Dict[str, str]] = None, + enable_v2_manifest_paths: Optional[bool] = None, + detached: Optional[bool] = False, + max_retries: int = 20, + ) -> BulkCommitResult: + """Create a new version of dataset with multiple transactions. + + This method is an advanced method which allows users to describe a change + that has been made to the data files. This method is not needed when using + Lance to apply changes (e.g. when using :py:class:`LanceDataset` or + :py:func:`write_dataset`.) + + Parameters + ---------- + dest: str, Path, or LanceDataset + The base uri of the dataset, or the dataset object itself. Using + the dataset object can be more efficient because it can re-use the + file metadata cache. + transactions: Iterable[Transaction] + The transactions to apply to the dataset. These will be merged into + a single transaction and applied to the dataset. Note: Only append + transactions are currently supported. Other transaction types will be + supported in the future. + commit_lock : CommitLock, optional + A custom commit lock. Only needed if your object store does not support + atomic commits. See the user guide for more details. + storage_options : optional, dict + Extra options that make sense for a particular storage connection. This is + used to store connection parameters like credentials, endpoint, etc. + enable_v2_manifest_paths : bool, optional + If True, and this is a new dataset, uses the new V2 manifest paths. + These paths provide more efficient opening of datasets with many + versions on object stores. This parameter has no effect if the dataset + already exists. To migrate an existing dataset, instead use the + :meth:`migrate_manifest_paths_v2` method. Default is False. WARNING: + turning this on will make the dataset unreadable for older versions + of Lance (prior to 0.17.0). + detached : bool, optional + If True, then the commit will not be part of the dataset lineage. It will + never show up as the latest dataset and the only way to check it out in the + future will be to specifically check it out by version. The version will be + a random version that is only unique amongst detached commits. The caller + should store this somewhere as there will be no other way to obtain it in + the future. + max_retries : int + The maximum number of retries to perform when committing the dataset. + + Returns + ------- + dict with keys: + dataset: LanceDataset + A new version of Lance Dataset. + merged: Transaction + The merged transaction that was applied to the dataset. + """ + if isinstance(dest, Path): + dest = str(dest) + elif isinstance(dest, LanceDataset): + dest = dest._ds + elif not isinstance(dest, str): + raise TypeError( + f"base_uri must be str, Path, or LanceDataset, got {type(dest)}" + ) + + if commit_lock: + if not callable(commit_lock): + raise TypeError( + f"commit_lock must be a function, got {type(commit_lock)}" + ) + + new_ds, merged = _Dataset.commit_batch( + dest, + transactions, + commit_lock, + storage_options=storage_options, + enable_v2_manifest_paths=enable_v2_manifest_paths, + detached=detached, + max_retries=max_retries, + ) + merged = Transaction(**merged) + # This logic is specific to append, which is all that should + # be returned here. + # TODO: generalize this to all other transaction types. + merged.operation["fragments"] = [ + FragmentMetadata.from_metadata(f) for f in merged.operation["fragments"] + ] + merged.operation = LanceOperation.Append(**merged.operation) + if merged.blobs_op: + merged.blobs_op["fragments"] = [ + FragmentMetadata.from_metadata(f) for f in merged.blobs_op["fragments"] + ] + merged.blobs_op = LanceOperation.Append(**merged.blobs_op) + ds = LanceDataset.__new__(LanceDataset) + ds._ds = new_ds + ds._uri = new_ds.uri + ds._default_scan_options = None + return dict( + dataset=ds, + merged=merged, + ) + def validate(self): """ Validate the dataset. @@ -2246,6 +2354,19 @@ def stats(self) -> "LanceStats": return LanceStats(self._ds) +class BulkCommitResult(TypedDict): + dataset: LanceDataset + merged: Transaction + + +@dataclass +class Transaction: + read_version: int + operation: LanceOperation.BaseOperation + uuid: str = dataclasses.field(default_factory=lambda: str(uuid.uuid4())) + blobs_op: Optional[LanceOperation.BaseOperation] = None + + # LanceOperation is a namespace for operations that can be applied to a dataset. class LanceOperation: @staticmethod diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index af1e2974bb..af74f9b2a6 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -864,6 +864,30 @@ def test_append_with_commit(tmp_path: Path): assert tbl == expected +def test_commit_batch_append(): + data1 = pa.Table.from_pydict({"a": range(100), "b": range(100)}) + dataset = lance.write_dataset(data1, "memory://test") + + data2 = pa.Table.from_pydict({"a": range(100, 200), "b": range(100, 200)}) + fragments2 = lance.fragment.write_fragments(data2, dataset) + op2 = lance.LanceOperation.Append(fragments2) + txn2 = lance.Transaction(1, op2) + data3 = pa.Table.from_pydict({"a": range(200, 300), "b": range(200, 300)}) + fragments3 = lance.fragment.write_fragments(data3, dataset) + op3 = lance.LanceOperation.Append(fragments3) + txn3 = lance.Transaction(2, op3) + + result = lance.LanceDataset.commit_batch(dataset, [txn2, txn3]) + dataset = result["dataset"] + assert dataset.version == 2 + assert len(dataset.get_fragments()) == 3 + assert dataset.to_table() == pa.concat_tables([data1, data2, data3]) + merged_txn = result["merged"] + assert isinstance(merged_txn, lance.Transaction) + assert isinstance(merged_txn.operation, lance.LanceOperation.Append) + assert merged_txn.operation.fragments == fragments2 + fragments3 + + def test_delete_with_commit(tmp_path: Path): table = pa.Table.from_pydict({"a": range(100), "b": range(100)}) base_dir = tmp_path / "test" diff --git a/python/src/dataset.rs b/python/src/dataset.rs index f280c1eb52..40636558be 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -63,15 +63,15 @@ use lance_table::format::Fragment; use lance_table::format::Index; use lance_table::io::commit::CommitHandler; use object_store::path::Path; -use pyo3::exceptions::{PyStopIteration, PyTypeError}; -use pyo3::prelude::*; -use pyo3::types::{PyBytes, PyInt, PyList, PySet, PyString}; +use pyo3::exceptions::{PyNotImplementedError, PyStopIteration, PyTypeError}; +use pyo3::types::{PyBytes, PyInt, PyList, PySet, PyString, PyTuple}; use pyo3::{ exceptions::{PyIOError, PyKeyError, PyValueError}, pyclass, types::{IntoPyDict, PyDict}, PyObject, PyResult, }; +use pyo3::{intern, prelude::*}; use snafu::{location, Location}; use uuid::Uuid; @@ -365,6 +365,30 @@ impl Operation { }; Ok(Self(op)) } + + /// Convert to a pydict that can be used as kwargs into the Operation dataclasses + fn to_dict<'a>(&self, py: Python<'a>) -> PyResult> { + let dict = PyDict::new_bound(py); + match &self.0 { + LanceOperation::Append { fragments } => { + let fragments = fragments + .iter() + .cloned() + .map(FragmentMetadata::new) + .map(|f| f.into_py(py)) + .collect::>(); + dict.set_item("fragments", fragments).unwrap(); + } + _ => { + return Err(PyNotImplementedError::new_err(format!( + "Operation.to_dict is not implemented for this operation: {:?}", + self.0 + ))); + } + } + + Ok(dict) + } } pub fn transforms_from_python(transforms: &PyAny) -> PyResult { @@ -1442,6 +1466,68 @@ impl Dataset { }) } + #[staticmethod] + fn commit_batch<'py>( + dest: &Bound<'py, PyAny>, + transactions: Vec>, + commit_lock: Option<&'py PyAny>, + storage_options: Option>, + enable_v2_manifest_paths: Option, + detached: Option, + max_retries: Option, + ) -> PyResult> { + let object_store_params = + storage_options + .as_ref() + .map(|storage_options| ObjectStoreParams { + storage_options: Some(storage_options.clone()), + ..Default::default() + }); + + let commit_handler = commit_lock.map(|commit_lock| { + Arc::new(PyCommitLock::new(commit_lock.to_object(commit_lock.py()))) + as Arc + }); + + let py = dest.py(); + let dest = if dest.is_instance_of::() { + let dataset: Dataset = dest.extract()?; + WriteDestination::Dataset(dataset.ds.clone()) + } else { + WriteDestination::Uri(dest.extract()?) + }; + + let mut builder = CommitBuilder::new(dest) + .enable_v2_manifest_paths(enable_v2_manifest_paths.unwrap_or(false)) + .with_detached(detached.unwrap_or(false)) + .with_max_retries(max_retries.unwrap_or(20)); + + if let Some(store_params) = object_store_params { + builder = builder.with_store_params(store_params); + } + + if let Some(commit_handler) = commit_handler { + builder = builder.with_commit_handler(commit_handler); + } + + let transactions = transactions + .into_iter() + .map(|transaction| extract_transaction(&transaction)) + .collect::>>()?; + + let res = RT + .block_on(Some(py), builder.execute_batch(transactions))? + .map_err(|err| PyIOError::new_err(err.to_string()))?; + let uri = res.dataset.uri().to_string(); + let ds = Self { + ds: Arc::new(res.dataset), + uri, + }; + let merged = export_transaction(&res.merged, py)?.to_object(py); + let ds = ds.into_py(py); + Ok(PyTuple::new_bound(py, [ds, merged])) + } + fn validate(&self) -> PyResult<()> { RT.block_on(None, self.ds.validate())? .map_err(|err| PyIOError::new_err(err.to_string())) @@ -1965,3 +2051,59 @@ impl UDFCheckpointStore for PyBatchUDFCheckpointWrapper { }) } } + +/// py_transaction is a dataclass with attributes +/// read_version: int +/// uuid: str +/// operation: LanceOperation.BaseOperation +/// blobs_op: Optional[LanceOperation.BaseOperation] = None +fn extract_transaction(py_transaction: &Bound) -> PyResult { + let py = py_transaction.py(); + let read_version = py_transaction.getattr("read_version")?.extract()?; + let uuid = py_transaction.getattr("uuid")?.extract()?; + let operation: Operation = py_transaction + .getattr("operation")? + .call_method0(intern!(py, "_to_inner"))? + .extract()?; + let operation = operation.0; + let blobs_op: Option = { + let blobs_op: Option> = py_transaction.getattr("blobs_op")?.extract()?; + if let Some(blobs_op) = blobs_op { + Some(blobs_op.call_method0(intern!(py, "_to_inner"))?.extract()?) + } else { + None + } + }; + let blobs_op = blobs_op.map(|op| op.0); + Ok(Transaction { + read_version, + uuid, + operation, + blobs_op, + tag: None, + }) +} + +// Exports to a pydict of kwargs to instantiation the python Transaction dataclass. +fn export_transaction<'a>( + transaction: &Transaction, + py: Python<'a>, +) -> PyResult> { + let dict = PyDict::new_bound(py); + dict.set_item("read_version", transaction.read_version)?; + dict.set_item("uuid", transaction.uuid.clone())?; + dict.set_item( + "operation", + Operation(transaction.operation.clone()).to_dict(py)?, + )?; + dict.set_item( + "blobs_op", + transaction + .blobs_op + .clone() + .map(Operation) + .map(|op| op.to_dict(py)) + .transpose()?, + )?; + Ok(dict) +} diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 626863a8c3..4b7fb4cea7 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -347,6 +347,74 @@ impl<'a> CommitBuilder<'a> { }), } } + + /// Commit a set of transactions as a single new version. + /// + ///
+ /// Only works for append transactions right now. Other kinds of transactions + /// will be supported in the future. + ///
+ pub async fn execute_batch(self, transactions: Vec) -> Result { + if transactions.is_empty() { + return Err(Error::InvalidInput { + source: "No transactions to commit".into(), + location: location!(), + }); + } + if transactions + .iter() + .any(|t| !matches!(t.operation, Operation::Append { .. })) + { + return Err(Error::NotSupported { + source: "Only append transactions are supported in batch commits".into(), + location: location!(), + }); + } + + let read_version = transactions.iter().map(|t| t.read_version).min().unwrap(); + let blob_new_frags = transactions + .iter() + .flat_map(|t| &t.blobs_op) + .flat_map(|b| match b { + Operation::Append { fragments } => fragments.clone(), + _ => unreachable!(), + }) + .collect::>(); + let blobs_op = if blob_new_frags.is_empty() { + None + } else { + Some(Operation::Append { + fragments: blob_new_frags, + }) + }; + + let merged = Transaction { + uuid: uuid::Uuid::new_v4().hyphenated().to_string(), + operation: Operation::Append { + fragments: transactions + .iter() + .flat_map(|t| match &t.operation { + Operation::Append { fragments } => fragments.clone(), + _ => unreachable!(), + }) + .collect(), + }, + read_version, + blobs_op, + tag: None, + }; + let dataset = self.execute(merged.clone()).await?; + Ok(BatchCommitResult { dataset, merged }) + } +} + +pub struct BatchCommitResult { + pub dataset: Dataset, + /// The final transaction that was committed. + pub merged: Transaction, + // TODO: Reject conflicts that need to be retried. + // /// Transactions that were rejected due to conflicts. + // pub rejected: Vec, } #[cfg(test)] @@ -363,23 +431,27 @@ mod tests { use super::*; + fn sample_fragment() -> Fragment { + Fragment { + id: 0, + files: vec![DataFile { + path: "file.lance".to_string(), + fields: vec![0], + column_indices: vec![0], + file_major_version: 2, + file_minor_version: 0, + }], + deletion_file: None, + row_id_meta: None, + physical_rows: Some(10), + } + } + fn sample_transaction(read_version: u64) -> Transaction { Transaction { uuid: uuid::Uuid::new_v4().hyphenated().to_string(), operation: Operation::Append { - fragments: vec![Fragment { - id: 0, - files: vec![DataFile { - path: "file.lance".to_string(), - fields: vec![0], - column_indices: vec![0], - file_major_version: 2, - file_minor_version: 0, - }], - deletion_file: None, - row_id_meta: None, - physical_rows: Some(10), - }], + fragments: vec![sample_fragment()], }, read_version, blobs_op: None, @@ -487,4 +559,68 @@ mod tests { assert!(reads > 20); assert_eq!(writes, 3); } + + #[tokio::test] + async fn test_commit_batch() { + // Create a dataset + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..10_i32))], + ) + .unwrap(); + let dataset = InsertBuilder::new("memory://test") + .execute(vec![batch]) + .await + .unwrap(); + let dataset = Arc::new(dataset); + + // Attempting to commit empty gives error + let res = CommitBuilder::new(dataset.clone()) + .execute_batch(vec![]) + .await; + assert!(matches!(res, Err(Error::InvalidInput { .. }))); + + // Attempting to commit update gives error + let update_transaction = Transaction { + uuid: uuid::Uuid::new_v4().hyphenated().to_string(), + operation: Operation::Update { + updated_fragments: vec![], + new_fragments: vec![], + removed_fragment_ids: vec![], + }, + read_version: 1, + blobs_op: None, + tag: None, + }; + let res = CommitBuilder::new(dataset.clone()) + .execute_batch(vec![update_transaction]) + .await; + assert!(matches!(res, Err(Error::NotSupported { .. }))); + + // Doing multiple appends includes all. + let append1 = sample_transaction(1); + let append2 = sample_transaction(2); + let mut expected_fragments = vec![]; + if let Operation::Append { fragments } = &append1.operation { + expected_fragments.extend(fragments.clone()); + } + if let Operation::Append { fragments } = &append2.operation { + expected_fragments.extend(fragments.clone()); + } + let res = CommitBuilder::new(dataset.clone()) + .execute_batch(vec![append1.clone(), append2.clone()]) + .await + .unwrap(); + let transaction = res.merged; + assert!( + matches!(transaction.operation, Operation::Append { fragments } if fragments == expected_fragments) + ); + assert_eq!(transaction.read_version, 1); + assert!(transaction.blobs_op.is_none()); + } }