Skip to content

Commit

Permalink
feat: add compaction support to balanced datasets (#3088)
Browse files Browse the repository at this point in the history
This allows compaction to succeed on the default storage.

Running compaction on the sibling storage can be added in a future PR.

In addition, this PR adds quite a few more test cases to make sure that
a balanced dataset either performs as expected or gives a good "not yet
supported" error message.

In addition, this PR reworks the dataset-offset based take (e.g.
`LanceDataset::take`) to reuse the id-based & address-based take paths
(e.g. `TakeBuilder`)

It also fixes a bug in the `TakeBuilder` path where duplicate IDs were
not being handled.

These latter changes are not strictly needed but are preparing for an
eventual revamp of the take operation to address
#2977
  • Loading branch information
westonpace authored Nov 5, 2024
1 parent 387c98c commit 2d3dd67
Show file tree
Hide file tree
Showing 8 changed files with 357 additions and 170 deletions.
3 changes: 3 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3040,6 +3040,9 @@ def compact_files(
max_rows_per_group: int, default 1024
Max number of rows per group. This does not affect which fragments
need compaction, but does affect how they are re-written if selected.
This setting only affects datasets using the legacy storage format.
The newer format does not require row groups.
max_bytes_per_file: Optional[int], default None
Max number of bytes in a single file. This does not affect which
fragments need compaction, but does affect how they are re-written if
Expand Down
205 changes: 180 additions & 25 deletions python/python/tests/test_balanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,45 @@ def big_val():
return b"0" * 1024 * 1024


def make_table(offset, num_rows, big_val):
end = offset + num_rows
values = pa.array([big_val for _ in range(num_rows)], pa.large_binary())
idx = pa.array(range(offset, end), pa.uint64())
table = pa.record_batch(
[values, idx],
schema=pa.schema(
[
pa.field(
"blobs",
pa.large_binary(),
metadata={
"lance-schema:storage-class": "blob",
},
),
pa.field("idx", pa.uint64()),
]
),
)
return table


# 16 batches of 8 rows = 128 rows
def balanced_datagen(big_val):
for batch_idx in range(16):
start = batch_idx * 8
end = start + 8
values = pa.array([big_val for _ in range(start, end)], pa.large_binary())
idx = pa.array(range(start, end), pa.uint64())
table = pa.record_batch(
[values, idx],
schema=pa.schema(
[
pa.field(
"blobs",
pa.large_binary(),
metadata={
"lance-schema:storage-class": "blob",
},
),
pa.field("idx", pa.uint64()),
]
),
)
yield table
def balanced_datagen(big_val, rows_per_batch, num_batches, offset=0):
for batch_idx in range(num_batches):
start = offset + (batch_idx * rows_per_batch)
yield make_table(start, rows_per_batch, big_val)


@pytest.fixture
def balanced_dataset(tmp_path, big_val):
# 16 MiB per file, 128 total MiB, so we should have 8 blob files
#
# In addition, max_rows_per_file=64 means we should get 2 regular files
schema = next(iter(balanced_datagen(big_val))).schema
rows_per_batch = 8
num_batches = 16
schema = next(iter(balanced_datagen(big_val, 1, 1))).schema
return lance.write_dataset(
balanced_datagen(big_val),
balanced_datagen(big_val, rows_per_batch, num_batches),
tmp_path / "test_ds",
max_bytes_per_file=16 * 1024 * 1024,
max_rows_per_file=64,
Expand All @@ -64,8 +70,10 @@ def test_append_then_take(balanced_dataset, tmp_path, big_val):
)

# Now verify we can append some data
rows_per_batch = 8
num_batches = 16
ds = lance.write_dataset(
balanced_datagen(big_val),
balanced_datagen(big_val, rows_per_batch, num_batches),
tmp_path / "test_ds",
max_bytes_per_file=32 * 1024 * 1024,
schema=balanced_dataset.schema,
Expand Down Expand Up @@ -118,6 +126,153 @@ def test_delete(balanced_dataset):
assert len(balanced_dataset._take_rows(range(20, 80), columns=["blobs"])) == 20


def test_scan(balanced_dataset):
# Scan without any special arguments should only return non-blob columns
expected = pa.table({"idx": pa.array(range(128), pa.uint64())})
assert balanced_dataset.to_table() == expected
assert balanced_dataset.to_table(columns=["idx"]) == expected
# Can filter on regular columns
assert balanced_dataset.to_table(columns=["idx"], filter="idx < 1000") == expected

# Scan with blob column specified should fail (currently, will support in future
# but need to make sure it fails currently so users don't shoot themselves in the
# foot)
with pytest.raises(
ValueError, match="Not supported.*Scanning.*non-default storage"
):
balanced_dataset.to_table(columns=["idx", "blobs"])
with pytest.raises(
ValueError, match="Not supported.*Scanning.*non-default storage"
):
balanced_dataset.to_table(columns=["blobs"])

# Can't filter on blob columns either
with pytest.raises(
ValueError,
match="Not supported.*non-default storage columns cannot be used as filters",
):
balanced_dataset.to_table(columns=["idx"], filter="blobs IS NOT NULL")


def test_compaction(tmp_path, big_val):
# Make a bunch of small 1-row writes
schema = next(iter(balanced_datagen(big_val, 1, 1))).schema
for write_idx in range(40):
lance.write_dataset(
balanced_datagen(big_val, 1, 1, offset=write_idx),
tmp_path / "test_ds",
max_bytes_per_file=16 * 1024 * 1024,
max_rows_per_file=64,
schema=schema,
mode="append",
)
# Run compaction. Normal storage should compact to 1 file. Blob storage
# should compact to 3 files (40MB over 16MB per file)
ds = lance.dataset(tmp_path / "test_ds")
ds.optimize.compact_files(max_bytes_per_file=16 * 1024 * 1024)

assert len(ds.get_fragments()) == 1

# TODO: Add support for compacting the blob files. For now, we just leave them
# uncompacted
assert len(list((tmp_path / "test_ds" / "_blobs" / "data").iterdir())) == 40

# Make sure we can still scan / take

assert ds.to_table() == pa.table(
{
"idx": pa.array(range(40), pa.uint64()),
}
)
row_ids = ds.to_table(columns=[], with_row_id=True).column("_rowid")
assert row_ids.to_pylist() == list(range(40))

assert ds._take_rows(row_ids.to_pylist(), columns=["idx"]) == pa.table(
{
"idx": pa.array(range(40), pa.uint64()),
}
)
assert ds._take_rows(row_ids.to_pylist(), columns=["blobs"]) == pa.table(
{
"blobs": pa.array([big_val for _ in range(40)], pa.large_binary()),
}
)


def test_schema(balanced_dataset):
# Schema should contain blob columns
assert balanced_dataset.schema == pa.schema(
[
pa.field(
"blobs",
pa.large_binary(),
metadata={
"lance-schema:storage-class": "blob",
},
),
pa.field("idx", pa.uint64()),
]
)


def test_sample(balanced_dataset):
assert balanced_dataset.sample(10, columns=["idx"]).num_rows == 10
# Not the most obvious error but hopefully not long lived
with pytest.raises(
OSError, match="Not supported.*mapping from row addresses to row ids"
):
assert balanced_dataset.sample(10).num_rows == 10
with pytest.raises(
OSError, match="Not supported.*mapping from row addresses to row ids"
):
assert balanced_dataset.sample(10, columns=["blobs"]).num_rows == 10


def test_add_columns(tmp_path, balanced_dataset):
# Adding columns should be fine as long as we don't try to use the blob
# column in any way

balanced_dataset.add_columns(
{
"idx2": "idx * 2",
}
)

assert balanced_dataset.to_table() == pa.table(
{
"idx": pa.array(range(128), pa.uint64()),
"idx2": pa.array(range(0, 256, 2), pa.uint64()),
}
)

with pytest.raises(
OSError, match="Not supported.*adding columns.*scanning non-default storage"
):
balanced_dataset.add_columns({"blobs2": "blobs"})


def test_unsupported(balanced_dataset, big_val):
# The following operations are not yet supported and we need to make
# sure they fail with a useful error message

# Updates & merge-insert are not supported. They add new rows and we
# will need to make sure the sibling datasets are kept in sync.

with pytest.raises(
ValueError, match="Not supported.*Updating.*non-default storage"
):
balanced_dataset.update({"idx": "0"})

with pytest.raises(
# This error could be nicer but it's fine for now
OSError,
match="Not supported.*Scanning.*non-default storage",
):
balanced_dataset.merge_insert("idx").when_not_matched_insert_all().execute(
make_table(0, 1, big_val)
)


# TODO: Once https://github.com/lancedb/lance/pull/3041 merges we will
# want to test partial appends. We need to make sure an append of
# non-blob data is supported. In order to do this we need to make
Expand Down
11 changes: 1 addition & 10 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1199,12 +1199,7 @@ impl Dataset {
row_indices: &[u64],
projection: impl Into<ProjectionRequest>,
) -> Result<RecordBatch> {
take::take(
self,
row_indices,
&projection.into().into_projection_plan(self.schema())?,
)
.await
take::take(self, row_indices, projection.into()).await
}

/// Take Rows by the internal ROW ids.
Expand Down Expand Up @@ -1605,10 +1600,6 @@ impl Dataset {
.collect())
}

// Leaving this here so it is more obvious to future readers that we can do this and
// someone doesn't go off and create a new function to do this. Delete this comment
// if you use this method.
#[allow(unused)]
pub(crate) async fn filter_deleted_addresses(&self, addrs: &[u64]) -> Result<Vec<u64>> {
self.filter_addr_or_ids(addrs, addrs).await
}
Expand Down
9 changes: 9 additions & 0 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,7 @@ impl FileFragment {
projection: &Schema,
with_row_address: bool,
) -> Result<RecordBatch> {
println!("Fragment take (offsets={:?}", row_offsets);
let reader = self
.open(
projection,
Expand Down Expand Up @@ -1186,6 +1187,14 @@ impl FileFragment {
}
schema = schema.project(&projection)?;
}

if schema.fields.iter().any(|f| !f.is_default_storage()) {
return Err(Error::NotSupported {
source: "adding columns whose value depends on scanning non-default storage".into(),
location: location!(),
});
}

// If there is no projection, we at least need to read the row addresses
with_row_addr |= schema.fields.is_empty();

Expand Down
37 changes: 28 additions & 9 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,16 @@ impl Scanner {
&mut self,
columns: &[(impl AsRef<str>, impl AsRef<str>)],
) -> Result<&mut Self> {
let physical_schema = self.scan_output_schema(true)?;
let base_schema = self.scan_output_schema(self.dataset.schema(), true)?;
self.projection_plan =
ProjectionPlan::try_new(&physical_schema, columns, /*load_blobs=*/ false)?;
ProjectionPlan::try_new(&base_schema, columns, /*load_blobs=*/ false)?;
if self.projection_plan.sibling_schema.is_some() {
return Err(Error::NotSupported {
source: "Scanning columns with non-default storage class is not yet supported"
.into(),
location: location!(),
});
}
Ok(self)
}

Expand Down Expand Up @@ -859,15 +866,17 @@ impl Scanner {
///
/// This includes columns that are added by the scan but don't exist in the dataset
/// schema (e.g. _distance, _rowid, _rowaddr)
pub(crate) fn scan_output_schema(&self, force_row_id: bool) -> Result<Arc<Schema>> {
pub(crate) fn scan_output_schema(
&self,
base_schema: &Schema,
force_row_id: bool,
) -> Result<Arc<Schema>> {
let extra_columns = self.get_extra_columns(force_row_id);

let schema = if !extra_columns.is_empty() {
self.projection_plan
.physical_schema
.merge(&ArrowSchema::new(extra_columns))?
base_schema.merge(&ArrowSchema::new(extra_columns))?
} else {
self.projection_plan.physical_schema.as_ref().clone()
base_schema.clone()
};

// drop metadata
Expand All @@ -888,7 +897,10 @@ impl Scanner {
// Append the extra columns
let mut output_expr = self.projection_plan.to_physical_exprs()?;

let physical_schema = ArrowSchema::from(self.scan_output_schema(false)?.as_ref());
let physical_schema = ArrowSchema::from(
self.scan_output_schema(&self.projection_plan.physical_schema, false)?
.as_ref(),
);

// distance goes before the row_id column
if self.nearest.is_some() && output_expr.iter().all(|(_, name)| name != DIST_COL) {
Expand Down Expand Up @@ -1043,6 +1055,12 @@ impl Scanner {
// which do not exist in the dataset schema but are added by the scan. We can ignore
// those as eager columns.
let filter_schema = self.dataset.schema().project_or_drop(&columns)?;
if filter_schema.fields.iter().any(|f| !f.is_default_storage()) {
return Err(Error::NotSupported {
source: "non-default storage columns cannot be used as filters".into(),
location: location!(),
});
}
let physical_schema = self.projection_plan.physical_schema.clone();
let remaining_schema = physical_schema.exclude(&filter_schema)?;

Expand Down Expand Up @@ -1367,7 +1385,8 @@ impl Scanner {
}

// Stage 5: take remaining columns required for projection
let physical_schema = self.scan_output_schema(false)?;
let physical_schema =
self.scan_output_schema(&self.projection_plan.physical_schema, false)?;
let remaining_schema = physical_schema.exclude(plan.schema().as_ref())?;
if !remaining_schema.fields.is_empty() {
plan = self.take(plan, &remaining_schema, self.batch_readahead)?;
Expand Down
Loading

0 comments on commit 2d3dd67

Please sign in to comment.