Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added support to write iceberg tables #5989

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8c81883
Initial commit
malhotrashivam Aug 27, 2024
758c1f3
Added type info map and modified instructions class hierarchy
malhotrashivam Aug 28, 2024
48cb8d8
Minor tweaks to the instructions class hierarchy
malhotrashivam Aug 28, 2024
244bc99
Merged writeTable and appendTable into addPartition
malhotrashivam Aug 29, 2024
09340c2
Split IcebergParquetWriteInstructions into WriteInstr and ParquetWrit…
malhotrashivam Aug 30, 2024
33b60e2
Merge branch 'main' into sm-ice-write
malhotrashivam Sep 25, 2024
c70b50e
Resolving more conflicts
malhotrashivam Sep 25, 2024
cd278ab
Merge branch 'main' into sm-ice-write
malhotrashivam Sep 30, 2024
689e8a1
Added unit tests and moved Iceberg tests to Junit5
malhotrashivam Sep 30, 2024
d7f2c81
Preparing change for code review Part 1
malhotrashivam Oct 1, 2024
131a552
Preparing for review Part 2
malhotrashivam Oct 1, 2024
3021585
Added more unit tests
malhotrashivam Oct 1, 2024
9f82ba0
Review with Larry part 1
malhotrashivam Oct 3, 2024
cbae64e
Fix for failing job
malhotrashivam Oct 3, 2024
7de59b0
Review with Larry Part 2
malhotrashivam Oct 3, 2024
83a0b14
Merge branch 'main' into sm-ice-write
malhotrashivam Oct 3, 2024
c83ddbd
Review with Devin Part 1
malhotrashivam Oct 7, 2024
f0f86cc
Fix for failing jobs
malhotrashivam Oct 7, 2024
adb21e9
Review with Devin Part 2
malhotrashivam Oct 8, 2024
744ce60
Review with Devin Part 3
malhotrashivam Oct 8, 2024
a8252ce
Review with Devin Part 4
malhotrashivam Oct 8, 2024
38f55f0
Merge branch 'main' into sm-ice-write
malhotrashivam Oct 14, 2024
5a64faf
Minor tweaks
malhotrashivam Oct 14, 2024
ba70f1a
More tweaks
malhotrashivam Oct 14, 2024
96db353
Updated some comments
malhotrashivam Oct 14, 2024
6e2c233
Updated javadoc and added new tests
malhotrashivam Oct 15, 2024
de6eba0
Merge branch 'main' into sm-ice-write
malhotrashivam Oct 17, 2024
0ebeba2
Review with Ryan Part 1
malhotrashivam Oct 17, 2024
31f46ba
Review with Ryan Part 2
malhotrashivam Oct 18, 2024
946def0
Fix for failing parquet reads
malhotrashivam Oct 18, 2024
bd8535c
Added more tests for writeDataFile
malhotrashivam Oct 18, 2024
78bd605
Added tests for on write callback
malhotrashivam Oct 18, 2024
500cfe6
Merge branch 'main' into sm-ice-write
malhotrashivam Oct 21, 2024
e2aba1f
Added support for writing partitioned tables
malhotrashivam Oct 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Map;
import java.util.Optional;

import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

class IcebergParquetWriteInstructionsTest {
Expand All @@ -18,7 +19,8 @@ void defaults() {
final IcebergParquetWriteInstructions instructions = IcebergParquetWriteInstructions.builder().build();
assertThat(instructions.tableDefinition().isEmpty()).isTrue();
assertThat(instructions.dataInstructions().isEmpty()).isTrue();
assertThat(instructions.columnRenames().isEmpty()).isTrue();
assertThat(instructions.dhToParquetColumnRenames().isEmpty()).isTrue();
assertThat(instructions.dhToIcebergColumnRenames().isEmpty()).isTrue();
assertThat(instructions.createTableIfNotExist()).isFalse();
assertThat(instructions.verifySchema()).isEmpty();
assertThat(instructions.compressionCodecName()).isEqualTo("SNAPPY");
Expand Down Expand Up @@ -82,10 +84,12 @@ void testSetTargetPageSize() {

@Test
void testMinMaximumDictionaryKeys() {

try {
IcebergParquetWriteInstructions.builder()
.maximumDictionaryKeys(-1)
.build();
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("maximumDictionaryKeys");
}
Expand All @@ -97,6 +101,7 @@ void testMinMaximumDictionarySize() {
IcebergParquetWriteInstructions.builder()
.maximumDictionarySize(-1)
.build();
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("maximumDictionarySize");
}
Expand All @@ -108,22 +113,78 @@ void testMinTargetPageSize() {
IcebergParquetWriteInstructions.builder()
.targetPageSize(1024)
.build();
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("targetPageSize");
}
}

@Test
void testSetToIcebergColumnRename() {
final IcebergParquetWriteInstructions instructions = IcebergParquetWriteInstructions.builder()
.putDhToIcebergColumnRenames("dh1", "ice1")
.putDhToIcebergColumnRenames("dh2", "ice2")
.build();
assertThat(instructions.dhToIcebergColumnRenames().size()).isEqualTo(2);
assertThat(instructions.dhToIcebergColumnRenames().get("dh1")).isEqualTo("ice1");
assertThat(instructions.dhToIcebergColumnRenames().get("dh2")).isEqualTo("ice2");

final IcebergParquetWriteInstructions instructions2 = IcebergParquetWriteInstructions.builder()
.putAllDhToIcebergColumnRenames(Map.of(
"dh1", "ice1",
"dh2", "ice2",
"dh3", "ice3"))
.build();
assertThat(instructions2.dhToIcebergColumnRenames().size()).isEqualTo(3);
assertThat(instructions2.dhToIcebergColumnRenames().get("dh1")).isEqualTo("ice1");
assertThat(instructions2.dhToIcebergColumnRenames().get("dh2")).isEqualTo("ice2");
assertThat(instructions2.dhToIcebergColumnRenames().get("dh3")).isEqualTo("ice3");
}

@Test
void testSetToParquetColumnRename() {
final IcebergParquetWriteInstructions instructions = IcebergParquetWriteInstructions.builder()
.putDhToParquetColumnRenames("dh1", "parquet1")
.putDhToParquetColumnRenames("dh2", "parquet2")
.build();
assertThat(instructions.dhToParquetColumnRenames().size()).isEqualTo(2);
assertThat(instructions.dhToParquetColumnRenames().get("dh1")).isEqualTo("parquet1");
assertThat(instructions.dhToParquetColumnRenames().get("dh2")).isEqualTo("parquet2");

final IcebergParquetWriteInstructions instructions2 = IcebergParquetWriteInstructions.builder()
.putAllDhToParquetColumnRenames(Map.of(
"dh1", "parquet1",
"dh2", "parquet2",
"dh3", "parquet3"))
.build();
assertThat(instructions2.dhToParquetColumnRenames().size()).isEqualTo(3);
assertThat(instructions2.dhToParquetColumnRenames().get("dh1")).isEqualTo("parquet1");
assertThat(instructions2.dhToParquetColumnRenames().get("dh2")).isEqualTo("parquet2");
assertThat(instructions2.dhToParquetColumnRenames().get("dh3")).isEqualTo("parquet3");
}

@Test
void toParquetInstructionTest() {
final IcebergParquetWriteInstructions icebergInstructions = IcebergParquetWriteInstructions.builder()
final IcebergParquetWriteInstructions writeInstructions = IcebergParquetWriteInstructions.builder()
.putDhToIcebergColumnRenames("dh1", "ice1")
.putDhToParquetColumnRenames("dh2", "parquet1")
.compressionCodecName("GZIP")
.maximumDictionaryKeys(100)
.maximumDictionarySize(200)
.targetPageSize(1024 * 1024)
.build();
final Map<Integer, String> fieldIdToName = Map.of(2, "field2", 3, "field3");
final ParquetInstructions parquetInstructions = icebergInstructions.toParquetInstructions(
final ParquetInstructions parquetInstructions = writeInstructions.toParquetInstructions(
null, fieldIdToName);

assertThat(parquetInstructions.getParquetColumnNameFromColumnNameOrDefault("dh1")).isEqualTo("dh1");
assertThat(parquetInstructions.getParquetColumnNameFromColumnNameOrDefault("ice1")).isEqualTo("ice1");
assertThat(parquetInstructions.getParquetColumnNameFromColumnNameOrDefault("dh2")).isEqualTo("parquet1");

assertThat(parquetInstructions.getColumnNameFromParquetColumnNameOrDefault("dh1")).isEqualTo("dh1");
assertThat(parquetInstructions.getColumnNameFromParquetColumnNameOrDefault("ice1")).isEqualTo("ice1");
assertThat(parquetInstructions.getColumnNameFromParquetColumnNameOrDefault("parquet1")).isEqualTo("dh2");

assertThat(parquetInstructions.getCompressionCodecName()).isEqualTo("GZIP");
assertThat(parquetInstructions.getMaximumDictionaryKeys()).isEqualTo(100);
assertThat(parquetInstructions.getMaximumDictionarySize()).isEqualTo(200);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ abstract class IcebergToolsTest {
ColumnDefinition.ofString("ColumnType"),
ColumnDefinition.ofBoolean("IsPartitioning"));

private IcebergInstructions instructions;
private IcebergReadInstructions instructions;

public abstract S3AsyncClient s3AsyncClient();

Expand Down Expand Up @@ -126,13 +126,14 @@ void setUp() throws Exception {

final S3Instructions s3Instructions = s3Instructions(S3Instructions.builder()).build();

instructions = IcebergInstructions.builder()
instructions = IcebergReadInstructions.builder()
.dataInstructions(s3Instructions)
.build();
}

@AfterEach
void tearDown() throws ExecutionException, InterruptedException {
void tearDown() throws Exception {
engineCleanup.tearDown();
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
for (String key : keys) {
asyncClient.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build()).get();
}
Expand Down Expand Up @@ -338,7 +339,7 @@ void testOpenTableS3Only() throws ExecutionException, InterruptedException, Time
void testOpenTableDefinition() throws ExecutionException, InterruptedException, TimeoutException {
uploadSalesPartitioned();

final IcebergInstructions localInstructions = IcebergInstructions.builder()
final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder()
.tableDefinition(SALES_PARTITIONED_DEFINITION)
.dataInstructions(instructions.dataInstructions().get())
.build();
Expand All @@ -365,7 +366,7 @@ void testOpenTablePartitionTypeException() {
ColumnDefinition.ofLong("Unit_Price"),
ColumnDefinition.ofTime("Order_Date"));

final IcebergInstructions localInstructions = IcebergInstructions.builder()
final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder()
.tableDefinition(tableDef)
.dataInstructions(instructions.dataInstructions().get())
.build();
Expand Down Expand Up @@ -401,7 +402,7 @@ void testOpenTableDefinitionRename() throws ExecutionException, InterruptedExcep
ColumnDefinition.ofDouble("UnitPrice"),
ColumnDefinition.ofTime("OrderDate"));

final IcebergInstructions localInstructions = IcebergInstructions.builder()
final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder()
.tableDefinition(renamed)
.dataInstructions(instructions.dataInstructions().get())
.putColumnRenames("Region", "RegionName")
Expand Down Expand Up @@ -437,7 +438,7 @@ void testSkippedPartitioningColumn() throws ExecutionException, InterruptedExcep
ColumnDefinition.ofDouble("Unit_Price"),
ColumnDefinition.ofTime("Order_Date"));

final IcebergInstructions localInstructions = IcebergInstructions.builder()
final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder()
.tableDefinition(tableDef)
.dataInstructions(instructions.dataInstructions().get())
.build();
Expand Down Expand Up @@ -466,7 +467,7 @@ void testReorderedPartitioningColumn() throws ExecutionException, InterruptedExc
ColumnDefinition.ofDouble("Unit_Price"),
ColumnDefinition.ofTime("Order_Date"));

final IcebergInstructions localInstructions = IcebergInstructions.builder()
final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder()
.tableDefinition(tableDef)
.dataInstructions(instructions.dataInstructions().get())
.build();
Expand All @@ -486,7 +487,7 @@ void testReorderedPartitioningColumn() throws ExecutionException, InterruptedExc
void testZeroPartitioningColumns() throws ExecutionException, InterruptedException, TimeoutException {
uploadSalesPartitioned();

final IcebergInstructions localInstructions = IcebergInstructions.builder()
final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder()
.tableDefinition(SALES_MULTI_DEFINITION)
.dataInstructions(instructions.dataInstructions().get())
.build();
Expand All @@ -513,7 +514,7 @@ void testIncorrectPartitioningColumns() throws ExecutionException, InterruptedEx
ColumnDefinition.ofDouble("Unit_Price"),
ColumnDefinition.ofTime("Order_Date"));

final IcebergInstructions localInstructions = IcebergInstructions.builder()
final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder()
.tableDefinition(tableDef)
.dataInstructions(instructions.dataInstructions().get())
.build();
Expand Down Expand Up @@ -548,7 +549,7 @@ void testMissingPartitioningColumns() {
ColumnDefinition.ofLong("Unit_Price"),
ColumnDefinition.ofTime("Order_Date"));

final IcebergInstructions localInstructions = IcebergInstructions.builder()
final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder()
.tableDefinition(tableDef)
.dataInstructions(instructions.dataInstructions().get())
.build();
Expand All @@ -575,7 +576,7 @@ void testMissingPartitioningColumns() {
void testOpenTableColumnRename() throws ExecutionException, InterruptedException, TimeoutException {
uploadSalesPartitioned();

final IcebergInstructions localInstructions = IcebergInstructions.builder()
final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder()
.dataInstructions(instructions.dataInstructions().get())
.putColumnRenames("Region", "RegionName")
.putColumnRenames("Item_Type", "ItemType")
Expand All @@ -595,7 +596,7 @@ void testOpenTableColumnRename() throws ExecutionException, InterruptedException
void testOpenTableColumnLegalization() throws ExecutionException, InterruptedException, TimeoutException {
uploadSalesRenamed();

final IcebergInstructions localInstructions = IcebergInstructions.builder()
final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder()
.dataInstructions(instructions.dataInstructions().get())
.build();

Expand All @@ -615,7 +616,7 @@ void testOpenTableColumnLegalizationRename()
throws ExecutionException, InterruptedException, TimeoutException {
uploadSalesRenamed();

final IcebergInstructions localInstructions = IcebergInstructions.builder()
final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder()
.dataInstructions(instructions.dataInstructions().get())
.putColumnRenames("Item&Type", "Item_Type")
.putColumnRenames("Units/Sold", "Units_Sold")
Expand Down Expand Up @@ -645,7 +646,7 @@ void testOpenTableColumnLegalizationPartitionException() {
ColumnDefinition.ofInt("Year").withPartitioning(),
ColumnDefinition.ofInt("Month").withPartitioning());

final IcebergInstructions localInstructions = IcebergInstructions.builder()
final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder()
.tableDefinition(tableDef)
.putColumnRenames("Year", "Current Year")
.putColumnRenames("Month", "Current Month")
Expand Down Expand Up @@ -675,7 +676,7 @@ void testOpenTableColumnRenamePartitioningColumns()
throws ExecutionException, InterruptedException, TimeoutException {
uploadSalesPartitioned();

final IcebergInstructions localInstructions = IcebergInstructions.builder()
final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder()
.dataInstructions(instructions.dataInstructions().get())
.putColumnRenames("VendorID", "vendor_id")
.putColumnRenames("month", "__month")
Expand Down Expand Up @@ -858,7 +859,7 @@ void testTableDefinitionTable() {
void testTableDefinitionWithInstructions() {
final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog);

IcebergInstructions localInstructions = IcebergInstructions.builder()
IcebergReadInstructions localInstructions = IcebergReadInstructions.builder()
.dataInstructions(instructions.dataInstructions().get())
.putColumnRenames("Region", "Area")
.putColumnRenames("Item_Type", "ItemType")
Expand All @@ -885,7 +886,7 @@ void testTableDefinitionWithInstructions() {
ColumnDefinition.ofString("Item_Type"),
ColumnDefinition.ofTime("Order_Date"));

localInstructions = IcebergInstructions.builder()
localInstructions = IcebergReadInstructions.builder()
.dataInstructions(instructions.dataInstructions().get())
.tableDefinition(userTableDef)
.build();
Expand Down
Loading