Skip to content

Commit

Permalink
feat: add commit_batch API (#3142)
Browse files Browse the repository at this point in the history
This allows users to commit a sequence of transactions as one new
version. Right now, this only supports Append transactions, which are
trivial to merge.

Closes #3097
  • Loading branch information
wjones127 authored Nov 21, 2024
1 parent 920b191 commit 0cda59a
Show file tree
Hide file tree
Showing 5 changed files with 442 additions and 17 deletions.
2 changes: 2 additions & 0 deletions python/python/lance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
LanceOperation,
LanceScanner,
MergeInsertBuilder,
Transaction,
__version__,
batch_udf,
write_dataset,
Expand Down Expand Up @@ -38,6 +39,7 @@
"LanceOperation",
"LanceScanner",
"MergeInsertBuilder",
"Transaction",
"__version__",
"write_dataset",
"schema_to_json",
Expand Down
123 changes: 122 additions & 1 deletion python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from __future__ import annotations

import copy
import dataclasses
import json
import logging
import os
import random
import time
import uuid
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
Expand All @@ -23,6 +25,7 @@
List,
Literal,
Optional,
Sequence,
Set,
TypedDict,
Union,
Expand Down Expand Up @@ -2171,7 +2174,6 @@ def commit(
2 3 c
3 4 d
"""
# TODO: mode is never used!
if isinstance(base_uri, Path):
base_uri = str(base_uri)
elif isinstance(base_uri, LanceDataset):
Expand Down Expand Up @@ -2211,6 +2213,112 @@ def commit(
ds._default_scan_options = None
return ds

@staticmethod
def commit_batch(
dest: Union[str, Path, LanceDataset],
transactions: Sequence[Transaction],
commit_lock: Optional[CommitLock] = None,
storage_options: Optional[Dict[str, str]] = None,
enable_v2_manifest_paths: Optional[bool] = None,
detached: Optional[bool] = False,
max_retries: int = 20,
) -> BulkCommitResult:
"""Create a new version of dataset with multiple transactions.
This method is an advanced method which allows users to describe a change
that has been made to the data files. This method is not needed when using
Lance to apply changes (e.g. when using :py:class:`LanceDataset` or
:py:func:`write_dataset`.)
Parameters
----------
dest: str, Path, or LanceDataset
The base uri of the dataset, or the dataset object itself. Using
the dataset object can be more efficient because it can re-use the
file metadata cache.
transactions: Iterable[Transaction]
The transactions to apply to the dataset. These will be merged into
a single transaction and applied to the dataset. Note: Only append
transactions are currently supported. Other transaction types will be
supported in the future.
commit_lock : CommitLock, optional
A custom commit lock. Only needed if your object store does not support
atomic commits. See the user guide for more details.
storage_options : optional, dict
Extra options that make sense for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.
enable_v2_manifest_paths : bool, optional
If True, and this is a new dataset, uses the new V2 manifest paths.
These paths provide more efficient opening of datasets with many
versions on object stores. This parameter has no effect if the dataset
already exists. To migrate an existing dataset, instead use the
:meth:`migrate_manifest_paths_v2` method. Default is False. WARNING:
turning this on will make the dataset unreadable for older versions
of Lance (prior to 0.17.0).
detached : bool, optional
If True, then the commit will not be part of the dataset lineage. It will
never show up as the latest dataset and the only way to check it out in the
future will be to specifically check it out by version. The version will be
a random version that is only unique amongst detached commits. The caller
should store this somewhere as there will be no other way to obtain it in
the future.
max_retries : int
The maximum number of retries to perform when committing the dataset.
Returns
-------
dict with keys:
dataset: LanceDataset
A new version of Lance Dataset.
merged: Transaction
The merged transaction that was applied to the dataset.
"""
if isinstance(dest, Path):
dest = str(dest)
elif isinstance(dest, LanceDataset):
dest = dest._ds
elif not isinstance(dest, str):
raise TypeError(
f"base_uri must be str, Path, or LanceDataset, got {type(dest)}"
)

if commit_lock:
if not callable(commit_lock):
raise TypeError(
f"commit_lock must be a function, got {type(commit_lock)}"
)

new_ds, merged = _Dataset.commit_batch(
dest,
transactions,
commit_lock,
storage_options=storage_options,
enable_v2_manifest_paths=enable_v2_manifest_paths,
detached=detached,
max_retries=max_retries,
)
merged = Transaction(**merged)
# This logic is specific to append, which is all that should
# be returned here.
# TODO: generalize this to all other transaction types.
merged.operation["fragments"] = [
FragmentMetadata.from_metadata(f) for f in merged.operation["fragments"]
]
merged.operation = LanceOperation.Append(**merged.operation)
if merged.blobs_op:
merged.blobs_op["fragments"] = [
FragmentMetadata.from_metadata(f) for f in merged.blobs_op["fragments"]
]
merged.blobs_op = LanceOperation.Append(**merged.blobs_op)
ds = LanceDataset.__new__(LanceDataset)
ds._ds = new_ds
ds._uri = new_ds.uri
ds._default_scan_options = None
return dict(
dataset=ds,
merged=merged,
)

def validate(self):
"""
Validate the dataset.
Expand Down Expand Up @@ -2246,6 +2354,19 @@ def stats(self) -> "LanceStats":
return LanceStats(self._ds)


class BulkCommitResult(TypedDict):
dataset: LanceDataset
merged: Transaction


@dataclass
class Transaction:
read_version: int
operation: LanceOperation.BaseOperation
uuid: str = dataclasses.field(default_factory=lambda: str(uuid.uuid4()))
blobs_op: Optional[LanceOperation.BaseOperation] = None


# LanceOperation is a namespace for operations that can be applied to a dataset.
class LanceOperation:
@staticmethod
Expand Down
24 changes: 24 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,30 @@ def test_append_with_commit(tmp_path: Path):
assert tbl == expected


def test_commit_batch_append():
data1 = pa.Table.from_pydict({"a": range(100), "b": range(100)})
dataset = lance.write_dataset(data1, "memory://test")

data2 = pa.Table.from_pydict({"a": range(100, 200), "b": range(100, 200)})
fragments2 = lance.fragment.write_fragments(data2, dataset)
op2 = lance.LanceOperation.Append(fragments2)
txn2 = lance.Transaction(1, op2)
data3 = pa.Table.from_pydict({"a": range(200, 300), "b": range(200, 300)})
fragments3 = lance.fragment.write_fragments(data3, dataset)
op3 = lance.LanceOperation.Append(fragments3)
txn3 = lance.Transaction(2, op3)

result = lance.LanceDataset.commit_batch(dataset, [txn2, txn3])
dataset = result["dataset"]
assert dataset.version == 2
assert len(dataset.get_fragments()) == 3
assert dataset.to_table() == pa.concat_tables([data1, data2, data3])
merged_txn = result["merged"]
assert isinstance(merged_txn, lance.Transaction)
assert isinstance(merged_txn.operation, lance.LanceOperation.Append)
assert merged_txn.operation.fragments == fragments2 + fragments3


def test_delete_with_commit(tmp_path: Path):
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
base_dir = tmp_path / "test"
Expand Down
148 changes: 145 additions & 3 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ use lance_table::format::Fragment;
use lance_table::format::Index;
use lance_table::io::commit::CommitHandler;
use object_store::path::Path;
use pyo3::exceptions::{PyStopIteration, PyTypeError};
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyInt, PyList, PySet, PyString};
use pyo3::exceptions::{PyNotImplementedError, PyStopIteration, PyTypeError};
use pyo3::types::{PyBytes, PyInt, PyList, PySet, PyString, PyTuple};
use pyo3::{
exceptions::{PyIOError, PyKeyError, PyValueError},
pyclass,
types::{IntoPyDict, PyDict},
PyObject, PyResult,
};
use pyo3::{intern, prelude::*};
use snafu::{location, Location};
use uuid::Uuid;

Expand Down Expand Up @@ -365,6 +365,30 @@ impl Operation {
};
Ok(Self(op))
}

/// Convert to a pydict that can be used as kwargs into the Operation dataclasses
fn to_dict<'a>(&self, py: Python<'a>) -> PyResult<Bound<'a, PyDict>> {
let dict = PyDict::new_bound(py);
match &self.0 {
LanceOperation::Append { fragments } => {
let fragments = fragments
.iter()
.cloned()
.map(FragmentMetadata::new)
.map(|f| f.into_py(py))
.collect::<Vec<_>>();
dict.set_item("fragments", fragments).unwrap();
}
_ => {
return Err(PyNotImplementedError::new_err(format!(
"Operation.to_dict is not implemented for this operation: {:?}",
self.0
)));
}
}

Ok(dict)
}
}

pub fn transforms_from_python(transforms: &PyAny) -> PyResult<NewColumnTransform> {
Expand Down Expand Up @@ -1442,6 +1466,68 @@ impl Dataset {
})
}

#[staticmethod]
fn commit_batch<'py>(
dest: &Bound<'py, PyAny>,
transactions: Vec<Bound<'py, PyAny>>,
commit_lock: Option<&'py PyAny>,
storage_options: Option<HashMap<String, String>>,
enable_v2_manifest_paths: Option<bool>,
detached: Option<bool>,
max_retries: Option<u32>,
) -> PyResult<Bound<'py, PyTuple>> {
let object_store_params =
storage_options
.as_ref()
.map(|storage_options| ObjectStoreParams {
storage_options: Some(storage_options.clone()),
..Default::default()
});

let commit_handler = commit_lock.map(|commit_lock| {
Arc::new(PyCommitLock::new(commit_lock.to_object(commit_lock.py())))
as Arc<dyn CommitHandler>
});

let py = dest.py();
let dest = if dest.is_instance_of::<Dataset>() {
let dataset: Dataset = dest.extract()?;
WriteDestination::Dataset(dataset.ds.clone())
} else {
WriteDestination::Uri(dest.extract()?)
};

let mut builder = CommitBuilder::new(dest)
.enable_v2_manifest_paths(enable_v2_manifest_paths.unwrap_or(false))
.with_detached(detached.unwrap_or(false))
.with_max_retries(max_retries.unwrap_or(20));

if let Some(store_params) = object_store_params {
builder = builder.with_store_params(store_params);
}

if let Some(commit_handler) = commit_handler {
builder = builder.with_commit_handler(commit_handler);
}

let transactions = transactions
.into_iter()
.map(|transaction| extract_transaction(&transaction))
.collect::<PyResult<Vec<_>>>()?;

let res = RT
.block_on(Some(py), builder.execute_batch(transactions))?
.map_err(|err| PyIOError::new_err(err.to_string()))?;
let uri = res.dataset.uri().to_string();
let ds = Self {
ds: Arc::new(res.dataset),
uri,
};
let merged = export_transaction(&res.merged, py)?.to_object(py);
let ds = ds.into_py(py);
Ok(PyTuple::new_bound(py, [ds, merged]))
}

fn validate(&self) -> PyResult<()> {
RT.block_on(None, self.ds.validate())?
.map_err(|err| PyIOError::new_err(err.to_string()))
Expand Down Expand Up @@ -1965,3 +2051,59 @@ impl UDFCheckpointStore for PyBatchUDFCheckpointWrapper {
})
}
}

/// py_transaction is a dataclass with attributes
/// read_version: int
/// uuid: str
/// operation: LanceOperation.BaseOperation
/// blobs_op: Optional[LanceOperation.BaseOperation] = None
fn extract_transaction(py_transaction: &Bound<PyAny>) -> PyResult<Transaction> {
let py = py_transaction.py();
let read_version = py_transaction.getattr("read_version")?.extract()?;
let uuid = py_transaction.getattr("uuid")?.extract()?;
let operation: Operation = py_transaction
.getattr("operation")?
.call_method0(intern!(py, "_to_inner"))?
.extract()?;
let operation = operation.0;
let blobs_op: Option<Operation> = {
let blobs_op: Option<Bound<PyAny>> = py_transaction.getattr("blobs_op")?.extract()?;
if let Some(blobs_op) = blobs_op {
Some(blobs_op.call_method0(intern!(py, "_to_inner"))?.extract()?)
} else {
None
}
};
let blobs_op = blobs_op.map(|op| op.0);
Ok(Transaction {
read_version,
uuid,
operation,
blobs_op,
tag: None,
})
}

// Exports to a pydict of kwargs to instantiation the python Transaction dataclass.
fn export_transaction<'a>(
transaction: &Transaction,
py: Python<'a>,
) -> PyResult<Bound<'a, PyDict>> {
let dict = PyDict::new_bound(py);
dict.set_item("read_version", transaction.read_version)?;
dict.set_item("uuid", transaction.uuid.clone())?;
dict.set_item(
"operation",
Operation(transaction.operation.clone()).to_dict(py)?,
)?;
dict.set_item(
"blobs_op",
transaction
.blobs_op
.clone()
.map(Operation)
.map(|op| op.to_dict(py))
.transpose()?,
)?;
Ok(dict)
}
Loading

0 comments on commit 0cda59a

Please sign in to comment.