Skip to content

Commit

Permalink
feat: support write multi fragments or empty fragment in one spark ta…
Browse files Browse the repository at this point in the history
…sk (#3183)

Now `FileFragment::create` only support create one file fragment and in
spark connector will cause these two issues:
1. if the spark task is empty, this api will have exception since there
is no data to create the fragment.
2. if the task data stream is very large, it will generate a huge file
in lance format. It is not friendly for spark parallism.

So I remove the assigned fragment id and add a new method named
`FileFragment::create_fragments` to generate empty or multi fragments.


![image](https://github.com/user-attachments/assets/54fb2497-8163-4652-9e0b-d50a88fade53)
  • Loading branch information
SaintBacchus authored Dec 2, 2024
1 parent dc9afbb commit 39222ec
Show file tree
Hide file tree
Showing 13 changed files with 281 additions and 81 deletions.
15 changes: 1 addition & 14 deletions java/core/lance-jni/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use lance_datafusion::utils::StreamingWriteSource;
use crate::error::{Error, Result};
use crate::{
blocking_dataset::{BlockingDataset, NATIVE_DATASET},
ffi::JNIEnvExt,
traits::FromJString,
utils::extract_write_params,
RT,
Expand Down Expand Up @@ -77,7 +76,6 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiArray<'local
dataset_uri: JString,
arrow_array_addr: jlong,
arrow_schema_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
Expand All @@ -91,7 +89,6 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiArray<'local
dataset_uri,
arrow_array_addr,
arrow_schema_addr,
fragment_id,
max_rows_per_file,
max_rows_per_group,
max_bytes_per_file,
Expand All @@ -108,7 +105,6 @@ fn inner_create_with_ffi_array<'local>(
dataset_uri: JString,
arrow_array_addr: jlong,
arrow_schema_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
Expand All @@ -131,7 +127,6 @@ fn inner_create_with_ffi_array<'local>(
create_fragment(
env,
dataset_uri,
fragment_id,
max_rows_per_file,
max_rows_per_group,
max_bytes_per_file,
Expand All @@ -147,7 +142,6 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiStream<'a>(
_obj: JObject,
dataset_uri: JString,
arrow_array_stream_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
Expand All @@ -160,7 +154,6 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiStream<'a>(
&mut env,
dataset_uri,
arrow_array_stream_addr,
fragment_id,
max_rows_per_file,
max_rows_per_group,
max_bytes_per_file,
Expand All @@ -176,7 +169,6 @@ fn inner_create_with_ffi_stream<'local>(
env: &mut JNIEnv<'local>,
dataset_uri: JString,
arrow_array_stream_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
Expand All @@ -189,7 +181,6 @@ fn inner_create_with_ffi_stream<'local>(
create_fragment(
env,
dataset_uri,
fragment_id,
max_rows_per_file,
max_rows_per_group,
max_bytes_per_file,
Expand All @@ -203,7 +194,6 @@ fn inner_create_with_ffi_stream<'local>(
fn create_fragment<'a>(
env: &mut JNIEnv<'a>,
dataset_uri: JString,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
Expand All @@ -213,8 +203,6 @@ fn create_fragment<'a>(
) -> Result<JString<'a>> {
let path_str = dataset_uri.extract(env)?;

let fragment_id_opts = env.get_int_opt(&fragment_id)?;

let write_params = extract_write_params(
env,
&max_rows_per_file,
Expand All @@ -223,9 +211,8 @@ fn create_fragment<'a>(
&mode,
&storage_options_obj,
)?;
let fragment = RT.block_on(FileFragment::create(
let fragment = RT.block_on(FileFragment::create_fragments(
&path_str,
fragment_id_opts.unwrap_or(0) as usize,
source,
Some(write_params),
))?;
Expand Down
4 changes: 2 additions & 2 deletions java/core/lance-jni/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ pub fn extract_write_params(
if let Some(mode_val) = env.get_string_opt(mode)? {
write_params.mode = WriteMode::try_from(mode_val.as_str())?;
}
// Java code always sets the data storage version to Legacy for now
write_params.data_storage_version = Some(LanceFileVersion::Legacy);
// Java code always sets the data storage version to stable for now
write_params.data_storage_version = Some(LanceFileVersion::Stable);
let jmap = JMap::from_env(env, storage_options_obj)?;
let storage_options: HashMap<String, String> = env.with_local_frame(16, |env| {
let mut map = HashMap::new();
Expand Down
29 changes: 13 additions & 16 deletions java/core/src/main/java/com/lancedb/lance/Fragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.lancedb.lance;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.arrow.c.ArrowArray;
Expand All @@ -36,43 +37,39 @@ public class Fragment {
* @param datasetUri the dataset uri
* @param allocator the buffer allocator
* @param root the vector schema root
* @param fragmentId the fragment id
* @param params the write params
* @return the fragment metadata
*/
public static FragmentMetadata create(String datasetUri, BufferAllocator allocator,
VectorSchemaRoot root, Optional<Integer> fragmentId, WriteParams params) {
public static List<FragmentMetadata> create(String datasetUri, BufferAllocator allocator,
VectorSchemaRoot root, WriteParams params) {
Preconditions.checkNotNull(datasetUri);
Preconditions.checkNotNull(allocator);
Preconditions.checkNotNull(root);
Preconditions.checkNotNull(fragmentId);
Preconditions.checkNotNull(params);
try (ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);
ArrowArray arrowArray = ArrowArray.allocateNew(allocator)) {
Data.exportVectorSchemaRoot(allocator, root, null, arrowArray, arrowSchema);
return FragmentMetadata.fromJson(createWithFfiArray(datasetUri, arrowArray.memoryAddress(),
arrowSchema.memoryAddress(), fragmentId, params.getMaxRowsPerFile(),
params.getMaxRowsPerGroup(), params.getMaxBytesPerFile(), params.getMode(),
params.getStorageOptions()));
return FragmentMetadata.fromJsonArray(createWithFfiArray(datasetUri,
arrowArray.memoryAddress(), arrowSchema.memoryAddress(),
params.getMaxRowsPerFile(), params.getMaxRowsPerGroup(), params.getMaxBytesPerFile(),
params.getMode(), params.getStorageOptions()));
}
}

/**
* Create a fragment from the given arrow stream.
* @param datasetUri the dataset uri
* @param stream the arrow stream
* @param fragmentId the fragment id
* @param params the write params
* @return the fragment metadata
*/
public static FragmentMetadata create(String datasetUri, ArrowArrayStream stream,
Optional<Integer> fragmentId, WriteParams params) {
public static List<FragmentMetadata> create(String datasetUri, ArrowArrayStream stream,
WriteParams params) {
Preconditions.checkNotNull(datasetUri);
Preconditions.checkNotNull(stream);
Preconditions.checkNotNull(fragmentId);
Preconditions.checkNotNull(params);
return FragmentMetadata.fromJson(createWithFfiStream(datasetUri,
stream.memoryAddress(), fragmentId,
return FragmentMetadata.fromJsonArray(createWithFfiStream(datasetUri,
stream.memoryAddress(),
params.getMaxRowsPerFile(), params.getMaxRowsPerGroup(),
params.getMaxBytesPerFile(), params.getMode(), params.getStorageOptions()));
}
Expand All @@ -83,7 +80,7 @@ public static FragmentMetadata create(String datasetUri, ArrowArrayStream stream
* @return the json serialized fragment metadata
*/
private static native String createWithFfiArray(String datasetUri,
long arrowArrayMemoryAddress, long arrowSchemaMemoryAddress, Optional<Integer> fragmentId,
long arrowArrayMemoryAddress, long arrowSchemaMemoryAddress,
Optional<Integer> maxRowsPerFile, Optional<Integer> maxRowsPerGroup,
Optional<Long> maxBytesPerFile, Optional<String> mode, Map<String, String> storageOptions);

Expand All @@ -93,7 +90,7 @@ private static native String createWithFfiArray(String datasetUri,
* @return the json serialized fragment metadata
*/
private static native String createWithFfiStream(String datasetUri, long arrowStreamMemoryAddress,
Optional<Integer> fragmentId, Optional<Integer> maxRowsPerFile,
Optional<Integer> maxRowsPerFile,
Optional<Integer> maxRowsPerGroup, Optional<Long> maxBytesPerFile,
Optional<String> mode, Map<String, String> storageOptions);
}
27 changes: 27 additions & 0 deletions java/core/src/main/java/com/lancedb/lance/FragmentMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package com.lancedb.lance;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.arrow.util.Preconditions;
import org.json.JSONArray;
import org.json.JSONObject;
import org.apache.commons.lang3.builder.ToStringBuilder;

Expand Down Expand Up @@ -75,4 +79,27 @@ public static FragmentMetadata fromJson(String jsonMetadata) {
return new FragmentMetadata(jsonMetadata, metadata.getInt(ID_KEY),
metadata.getLong(PHYSICAL_ROWS_KEY));
}

/**
* Converts a JSON array string into a list of FragmentMetadata objects.
*
* @param jsonMetadata A JSON array string containing fragment metadata.
* @return A list of FragmentMetadata objects.
*/
public static List<FragmentMetadata> fromJsonArray(String jsonMetadata) {
Preconditions.checkNotNull(jsonMetadata);
JSONArray metadatas = new JSONArray(jsonMetadata);
List<FragmentMetadata> fragmentMetadataList = new ArrayList<>();
for (Object object : metadatas) {
JSONObject metadata = (JSONObject) object;
if (!metadata.has(ID_KEY) || !metadata.has(PHYSICAL_ROWS_KEY)) {
throw new IllegalArgumentException(
String.format("Fragment metadata must have {} and {} but is {}",
ID_KEY, PHYSICAL_ROWS_KEY, jsonMetadata));
}
fragmentMetadataList.add(new FragmentMetadata(metadata.toString(), metadata.getInt(ID_KEY),
metadata.getLong(PHYSICAL_ROWS_KEY)));
}
return fragmentMetadataList;
}
}
36 changes: 28 additions & 8 deletions java/core/src/test/java/com/lancedb/lance/FragmentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.types.pojo.Schema;
Expand All @@ -37,7 +37,7 @@ void testFragmentCreateFfiArray() {
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
testDataset.createNewFragment(123, 20);
testDataset.createNewFragment(20);
}
}

Expand All @@ -47,9 +47,8 @@ void testFragmentCreate() throws Exception {
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
int fragmentId = 312;
int rowCount = 21;
FragmentMetadata fragmentMeta = testDataset.createNewFragment(fragmentId, rowCount);
FragmentMetadata fragmentMeta = testDataset.createNewFragment(rowCount);

// Commit fragment
FragmentOperation.Append appendOp = new FragmentOperation.Append(Arrays.asList(fragmentMeta));
Expand All @@ -58,8 +57,7 @@ void testFragmentCreate() throws Exception {
assertEquals(2, dataset.latestVersion());
assertEquals(rowCount, dataset.countRows());
DatasetFragment fragment = dataset.getFragments().get(0);
assertEquals(fragmentId, fragment.getId());


try (LanceScanner scanner = fragment.newScan()) {
Schema schemaRes = scanner.schema();
assertEquals(testDataset.getSchema(), schemaRes);
Expand All @@ -74,7 +72,7 @@ void commitWithoutVersion() {
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
FragmentMetadata meta = testDataset.createNewFragment(123, 20);
FragmentMetadata meta = testDataset.createNewFragment(20);
FragmentOperation.Append appendOp = new FragmentOperation.Append(Arrays.asList(meta));
assertThrows(IllegalArgumentException.class, () -> {
Dataset.commit(allocator, datasetPath, appendOp, Optional.empty());
Expand All @@ -88,7 +86,7 @@ void commitOldVersion() {
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
FragmentMetadata meta = testDataset.createNewFragment(123, 20);
FragmentMetadata meta = testDataset.createNewFragment(20);
FragmentOperation.Append appendOp = new FragmentOperation.Append(Arrays.asList(meta));
assertThrows(IllegalArgumentException.class, () -> {
Dataset.commit(allocator, datasetPath, appendOp, Optional.of(0L));
Expand All @@ -107,4 +105,26 @@ void appendWithoutFragment() {
});
}
}

@Test
void testEmptyFragments() {
String datasetPath = tempDir.resolve("testEmptyFragments").toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
List<FragmentMetadata> fragments = testDataset.createNewFragment(0, 10);
assertEquals(0, fragments.size());
}
}

@Test
void testMultiFragments() {
String datasetPath = tempDir.resolve("testMultiFragments").toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
List<FragmentMetadata> fragments = testDataset.createNewFragment(20, 10);
assertEquals(2, fragments.size());
}
}
}
30 changes: 14 additions & 16 deletions java/core/src/test/java/com/lancedb/lance/ScannerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,16 @@ void testScanFragment() throws Exception {
try (BufferAllocator allocator = new RootAllocator()) {
TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
int[] fragment0 = new int[]{0, 3};
int[] fragment1 = new int[]{1, 5};
int[] fragment2 = new int[]{2, 7};
FragmentMetadata metadata0 = testDataset.createNewFragment(fragment0[0], fragment0[1]);
FragmentMetadata metadata1 = testDataset.createNewFragment(fragment1[0], fragment1[1]);
FragmentMetadata metadata2 = testDataset.createNewFragment(fragment2[0], fragment2[1]);
FragmentMetadata metadata0 = testDataset.createNewFragment(3);
FragmentMetadata metadata1 = testDataset.createNewFragment(5);
FragmentMetadata metadata2 = testDataset.createNewFragment(7);
FragmentOperation.Append appendOp = new FragmentOperation.Append(Arrays.asList(metadata0, metadata1, metadata2));
try (Dataset dataset = Dataset.commit(allocator, datasetPath, appendOp, Optional.of(1L))) {
validScanResult(dataset, fragment0[0], fragment0[1]);
validScanResult(dataset, fragment1[0], fragment1[1]);
validScanResult(dataset, fragment2[0], fragment2[1]);
List<DatasetFragment> frags = dataset.getFragments();
assertEquals(3, frags.size());
validScanResult(dataset, frags.get(0).getId(), 3);
validScanResult(dataset, frags.get(1).getId(), 5);
validScanResult(dataset, frags.get(2).getId(), 7);
}
}
}
Expand All @@ -246,15 +245,14 @@ void testScanFragments() throws Exception {
try (BufferAllocator allocator = new RootAllocator()) {
TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
int[] fragment0 = new int[]{0, 3};
int[] fragment1 = new int[]{1, 5};
int[] fragment2 = new int[]{2, 7};
FragmentMetadata metadata0 = testDataset.createNewFragment(fragment0[0], fragment0[1]);
FragmentMetadata metadata1 = testDataset.createNewFragment(fragment1[0], fragment1[1]);
FragmentMetadata metadata2 = testDataset.createNewFragment(fragment2[0], fragment2[1]);
FragmentMetadata metadata0 = testDataset.createNewFragment(3);
FragmentMetadata metadata1 = testDataset.createNewFragment(5);
FragmentMetadata metadata2 = testDataset.createNewFragment(7);
FragmentOperation.Append appendOp = new FragmentOperation.Append(Arrays.asList(metadata0, metadata1, metadata2));
try (Dataset dataset = Dataset.commit(allocator, datasetPath, appendOp, Optional.of(1L))) {
try (Scanner scanner = dataset.newScan(new ScanOptions.Builder().batchSize(1024).fragmentIds(Arrays.asList(1, 2)).build())) {
List<DatasetFragment> frags = dataset.getFragments();
assertEquals(3, frags.size());
try (Scanner scanner = dataset.newScan(new ScanOptions.Builder().batchSize(1024).fragmentIds(Arrays.asList(frags.get(1).getId(), frags.get(2).getId())).build())) {
try (ArrowReader reader = scanner.scanBatches()) {
assertEquals(dataset.getSchema().getFields(), reader.getVectorSchemaRoot().getSchema().getFields());
int rowcount = 0;
Expand Down
Loading

0 comments on commit 39222ec

Please sign in to comment.