Skip to content

Commit

Permalink
Kafka-connect: Handle namespace creation for auto table creation (#10186
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ajantha-bhat authored May 15, 2024
1 parent 2058053 commit 4c9f47d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
*/
package org.apache.iceberg.connect.data;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -83,6 +87,8 @@ Table autoCreateTable(String tableName, SinkRecord sample) {
org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields());
TableIdentifier identifier = TableIdentifier.parse(tableName);

createNamespaceIfNotExist(catalog, identifier.namespace());

List<String> partitionBy = config.tableConfig(tableName).partitionBy();
PartitionSpec spec;
try {
Expand Down Expand Up @@ -112,4 +118,22 @@ Table autoCreateTable(String tableName, SinkRecord sample) {
});
return result.get();
}

@VisibleForTesting
static void createNamespaceIfNotExist(Catalog catalog, Namespace identifierNamespace) {
if (!(catalog instanceof SupportsNamespaces)) {
return;
}

String[] levels = identifierNamespace.levels();
for (int index = 0; index < levels.length; index++) {
Namespace namespace = Namespace.of(Arrays.copyOfRange(levels, 0, index + 1));
try {
((SupportsNamespaces) catalog).createNamespace(namespace);
} catch (AlreadyExistsException | ForbiddenException ex) {
// Ignoring the error as forcefully creating the namespace even if it exists
// to avoid double namespaceExists() check.
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.TableSinkConfig;
Expand All @@ -47,7 +52,7 @@ public class IcebergWriterFactoryTest {
@ValueSource(booleans = {true, false})
@SuppressWarnings("unchecked")
public void testAutoCreateTable(boolean partitioned) {
Catalog catalog = mock(Catalog.class);
Catalog catalog = mock(Catalog.class, withSettings().extraInterfaces(SupportsNamespaces.class));
when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such table"));

TableSinkConfig tableConfig = mock(TableSinkConfig.class);
Expand All @@ -63,7 +68,7 @@ public void testAutoCreateTable(boolean partitioned) {
when(record.value()).thenReturn(ImmutableMap.of("id", 123, "data", "foo2"));

IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
factory.autoCreateTable("db.tbl", record);
factory.autoCreateTable("foo1.foo2.foo3.bar", record);

ArgumentCaptor<TableIdentifier> identCaptor = ArgumentCaptor.forClass(TableIdentifier.class);
ArgumentCaptor<Schema> schemaCaptor = ArgumentCaptor.forClass(Schema.class);
Expand All @@ -77,10 +82,18 @@ public void testAutoCreateTable(boolean partitioned) {
specCaptor.capture(),
propsCaptor.capture());

assertThat(identCaptor.getValue()).isEqualTo(TableIdentifier.of("db", "tbl"));
assertThat(identCaptor.getValue())
.isEqualTo(TableIdentifier.of(Namespace.of("foo1", "foo2", "foo3"), "bar"));
assertThat(schemaCaptor.getValue().findField("id").type()).isEqualTo(LongType.get());
assertThat(schemaCaptor.getValue().findField("data").type()).isEqualTo(StringType.get());
assertThat(specCaptor.getValue().isPartitioned()).isEqualTo(partitioned);
assertThat(propsCaptor.getValue()).containsKey("test-prop");

ArgumentCaptor<Namespace> namespaceCaptor = ArgumentCaptor.forClass(Namespace.class);
verify((SupportsNamespaces) catalog, times(3)).createNamespace(namespaceCaptor.capture());
List<Namespace> capturedArguments = namespaceCaptor.getAllValues();
assertThat(capturedArguments.get(0)).isEqualTo(Namespace.of("foo1"));
assertThat(capturedArguments.get(1)).isEqualTo(Namespace.of("foo1", "foo2"));
assertThat(capturedArguments.get(2)).isEqualTo(Namespace.of("foo1", "foo2", "foo3"));
}
}

0 comments on commit 4c9f47d

Please sign in to comment.