Skip to content

Commit

Permalink
feat(java): support get real data size for lance spark statistics int…
Browse files Browse the repository at this point in the history
…erface (#3337)

With #3328, now we can calculate the dataset disk size.
  • Loading branch information
SaintBacchus authored Jan 5, 2025
1 parent dbf9139 commit 1d40479
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 7 deletions.
43 changes: 43 additions & 0 deletions java/core/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use jni::sys::{jboolean, jint};
use jni::sys::{jbyteArray, jlong};
use jni::{objects::JObject, JNIEnv};
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::statistics::{DataStatistics, DatasetStatisticsExt};
use lance::dataset::transaction::Operation;
use lance::dataset::{ColumnAlteration, Dataset, ProjectionRequest, ReadParams, WriteParams};
use lance::io::{ObjectStore, ObjectStoreParams};
Expand Down Expand Up @@ -154,6 +155,11 @@ impl BlockingDataset {
Ok(rows)
}

pub fn calculate_data_stats(&self) -> Result<DataStatistics> {
let stats = RT.block_on(Arc::new(self.clone().inner).calculate_data_stats())?;
Ok(stats)
}

pub fn list_indexes(&self) -> Result<Arc<Vec<Index>>> {
let indexes = RT.block_on(self.inner.load_indices())?;
Ok(indexes)
Expand Down Expand Up @@ -725,6 +731,43 @@ fn inner_count_rows(
dataset_guard.count_rows(filter)
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeGetDataStatistics<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject,
) -> JObject<'local> {
ok_or_throw!(env, inner_get_data_statistics(&mut env, java_dataset))
}

fn inner_get_data_statistics<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject,
) -> Result<JObject<'local>> {
let stats = {
let dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
dataset_guard.calculate_data_stats()?
};
let data_stats = env.new_object("com/lancedb/lance/ipc/DataStatistics", "()V", &[])?;

for field in stats.fields {
let id = field.id as jint;
let byte_size = field.bytes_on_disk as jlong;
let filed_jobj = env.new_object(
"com/lancedb/lance/ipc/FieldStatistics",
"(IJ)V",
&[JValue::Int(id), JValue::Long(byte_size)],
)?;
env.call_method(
&data_stats,
"addFiledStatistics",
"(Lcom/lancedb/lance/ipc/FieldStatistics;)V",
&[JValue::Object(&filed_jobj)],
)?;
}
Ok(data_stats)
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeListIndexes<'local>(
mut env: JNIEnv<'local>,
Expand Down
20 changes: 20 additions & 0 deletions java/core/src/main/java/com/lancedb/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.lancedb.lance.index.IndexParams;
import com.lancedb.lance.index.IndexType;
import com.lancedb.lance.ipc.DataStatistics;
import com.lancedb.lance.ipc.LanceScanner;
import com.lancedb.lance.ipc.ScanOptions;
import com.lancedb.lance.schema.ColumnAlteration;
Expand Down Expand Up @@ -450,6 +451,25 @@ public long countRows(String filter) {

private native long nativeCountRows(Optional<String> filter);

/**
* Calculate the size of the dataset.
*
* @return the size of the dataset
*/
public long calculateDataSize() {
try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
return nativeGetDataStatistics().getDataSize();
}
}

/**
* Calculate the statistics of the dataset.
*
* @return the statistics of the dataset
*/
private native DataStatistics nativeGetDataStatistics();

/**
* Get all fragments in this dataset.
*
Expand Down
45 changes: 45 additions & 0 deletions java/core/src/main/java/com/lancedb/lance/ipc/DataStatistics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lancedb.lance.ipc;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class DataStatistics implements Serializable {
private final List<FieldStatistics> fields;

public DataStatistics() {
this.fields = new ArrayList<>();
}

// used for rust to add field statistics
public void addFiledStatistics(FieldStatistics fieldStatistics) {
fields.add(fieldStatistics);
}

public List<FieldStatistics> getFields() {
return fields;
}

// get total data size of the whole dataset in bytes
public long getDataSize() {
return fields.stream().mapToLong(FieldStatistics::getDataSize).sum();
}

@Override
public String toString() {
return "DataStatistics{" + "fields=" + fields + '}';
}
}
40 changes: 40 additions & 0 deletions java/core/src/main/java/com/lancedb/lance/ipc/FieldStatistics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lancedb.lance.ipc;

import java.io.Serializable;

public class FieldStatistics implements Serializable {
private final int id;
// The size of the data in bytes
private final long dataSize;

public FieldStatistics(int id, long dataSize) {
this.id = id;
this.dataSize = dataSize;
}

public int getId() {
return id;
}

public long getDataSize() {
return dataSize;
}

@Override
public String toString() {
return "FieldStatistics{" + "id=" + id + ", dataSize=" + dataSize + '}';
}
}
15 changes: 15 additions & 0 deletions java/core/src/test/java/com/lancedb/lance/DatasetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,4 +358,19 @@ void testCountRows() {
}
}
}

@Test
void testCalculateDataSize() {
String testMethodName = new Object() {}.getClass().getEnclosingMethod().getName();
String datasetPath = tempDir.resolve(testMethodName).toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
dataset = testDataset.createEmptyDataset();

try (Dataset dataset2 = testDataset.write(1, 5)) {
assertEquals(100, dataset2.calculateDataSize());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ public static Optional<Long> getDatasetRowCount(LanceConfig config) {
}
}

public static Optional<Long> getDatasetDataSize(LanceConfig config) {
String uri = config.getDatasetUri();
ReadOptions options = SparkOptions.genReadOptionFromConfig(config);
try (Dataset dataset = Dataset.open(allocator, uri, options)) {
return Optional.of(dataset.calculateDataSize());
} catch (IllegalArgumentException e) {
// dataset not found
return Optional.empty();
}
}

public static List<Integer> getFragmentIds(LanceConfig config) {
String uri = config.getDatasetUri();
ReadOptions options = SparkOptions.genReadOptionFromConfig(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,22 @@
import com.lancedb.lance.spark.utils.Optional;

import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.types.StructType;

import java.util.OptionalLong;

public class LanceStatistics implements Statistics {
private final Optional<Long> rowNumber;
private final Optional<StructType> schema;
private final Optional<Long> dataBytesSize;

public LanceStatistics(LanceConfig config) {
this.rowNumber = LanceDatasetAdapter.getDatasetRowCount(config);
this.schema = LanceDatasetAdapter.getSchema(config);
this.dataBytesSize = LanceDatasetAdapter.getDatasetDataSize(config);
}

@Override
public OptionalLong sizeInBytes() {
// TODO: Support quickly get the bytes on disk for the lance dataset
// Now use schema to infer the byte size for simple
if (rowNumber.isPresent()) {
return OptionalLong.of(schema.get().defaultSize() * rowNumber.get());
if (dataBytesSize.isPresent()) {
return OptionalLong.of(dataBytesSize.get());
} else {
return OptionalLong.empty();
}
Expand Down

0 comments on commit 1d40479

Please sign in to comment.