Skip to content

Commit

Permalink
[BugFix] Fix bugs for iceberg reset catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
Youngwb committed Jan 24, 2025
1 parent a2c62c0 commit 37e0b2a
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 70 deletions.
7 changes: 7 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,13 @@ under the License.
<version>${iceberg.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents.client5/httpclient5 -->
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.4.1</version>
</dependency>

<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-kernel-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public boolean createView(ConnectorViewDefinition connectorViewDefinition, boole
return delegate.createView(connectorViewDefinition, replace);
}

@Override
public boolean dropView(String dbName, String viewName) {
return delegate.dropView(dbName, viewName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public Database getDB(String dbName) {
@Override
public List<String> listTables(String dbName) {
Namespace ns = convertDbNameToNamespace(dbName);
List<TableIdentifier> tableIdentifiers = delegate.listTables(ns);
List<TableIdentifier> tableIdentifiers = new ArrayList<>(delegate.listTables(ns));
List<TableIdentifier> viewIdentifiers = new ArrayList<>();
try {
viewIdentifiers = delegate.listViews(ns);
Expand Down Expand Up @@ -246,8 +246,9 @@ public boolean createView(ConnectorViewDefinition definition, boolean replace) {
return true;
}

public boolean dropView(Namespace ns, String viewName) {
return delegate.dropView(TableIdentifier.of(ns, viewName));
@Override
public boolean dropView(String dbName, String viewName) {
return delegate.dropView(TableIdentifier.of(convertDbNameToNamespace(dbName), viewName));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1465,73 +1465,6 @@ public void testFileWrapper() {
Assert.assertEquals(deleteFileWrapper.fileSequenceNumber(), FILE_C_1.fileSequenceNumber());
}

@Test
public void testCreateView(@Mocked RESTCatalog restCatalog, @Mocked BaseView baseView,
@Mocked ImmutableSQLViewRepresentation representation) throws Exception {
UtFrameUtils.createMinStarRocksCluster();
AnalyzeTestUtil.init();
IcebergRESTCatalog icebergRESTCatalog = new IcebergRESTCatalog(restCatalog, new Configuration());
CachingIcebergCatalog cachingIcebergCatalog = new CachingIcebergCatalog(
CATALOG_NAME, icebergRESTCatalog, DEFAULT_CATALOG_PROPERTIES, Executors.newSingleThreadExecutor());

IcebergMetadata metadata = new IcebergMetadata(CATALOG_NAME, HDFS_ENVIRONMENT, cachingIcebergCatalog,
Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor(),
new IcebergCatalogProperties(DEFAULT_CONFIG));

new Expectations() {
{
restCatalog.loadNamespaceMetadata(Namespace.of("db"));
result = ImmutableMap.of("location", "xxxxx");
minTimes = 1;

restCatalog.name();
result = "rest_catalog";
minTimes = 1;
}
};

CreateViewStmt stmt = new CreateViewStmt(false, false, new TableName("catalog", "db", "table"),
Lists.newArrayList(new ColWithComment("k1", "", NodePosition.ZERO)), "", false, null, NodePosition.ZERO);
stmt.setColumns(Lists.newArrayList(new Column("k1", INT)));
metadata.createView(stmt);

new Expectations() {
{
representation.sql();
result = "select * from table";
minTimes = 1;

baseView.sqlFor("starrocks");
result = representation;
minTimes = 1;

baseView.properties();
result = ImmutableMap.of("comment", "mocked");
minTimes = 1;

baseView.schema();
result = new Schema(Types.NestedField.optional(1, "k1", Types.IntegerType.get()));
minTimes = 1;

baseView.name();
result = "view";
minTimes = 1;

baseView.location();
result = "xxx";
minTimes = 1;

restCatalog.loadView(TableIdentifier.of("db", "view"));
result = baseView;
minTimes = 1;
}
};

Table table = metadata.getView("db", "view");
Assert.assertEquals(ICEBERG_VIEW, table.getType());
Assert.assertEquals("xxx", table.getTableLocation());
}

@Test
public void testVersionRange() {
TableVersionRange versionRange = TableVersionRange.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,75 @@
package com.starrocks.connector.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.starrocks.analysis.TableName;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.IcebergView;
import com.starrocks.catalog.Table;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.iceberg.rest.IcebergRESTCatalog;
import com.starrocks.sql.analyzer.AnalyzeTestUtil;
import com.starrocks.sql.ast.ColWithComment;
import com.starrocks.sql.ast.CreateViewStmt;
import com.starrocks.sql.ast.DropTableStmt;
import com.starrocks.sql.parser.NodePosition;
import com.starrocks.utframe.UtFrameUtils;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.view.BaseView;
import org.apache.iceberg.view.ImmutableSQLViewRepresentation;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;

import static com.starrocks.catalog.Table.TableType.ICEBERG_VIEW;
import static com.starrocks.catalog.Type.INT;
import static com.starrocks.connector.iceberg.IcebergCatalogProperties.HIVE_METASTORE_URIS;
import static com.starrocks.connector.iceberg.IcebergCatalogProperties.ICEBERG_CATALOG_TYPE;

public class IcebergRESTCatalogTest {
private static final String CATALOG_NAME = "iceberg_rest_catalog";
public static final IcebergCatalogProperties DEFAULT_CATALOG_PROPERTIES;
public static final Map<String, String> DEFAULT_CONFIG = new HashMap<>();
public static final HdfsEnvironment HDFS_ENVIRONMENT = new HdfsEnvironment();

static {
DEFAULT_CONFIG.put(HIVE_METASTORE_URIS, "thrift://188.122.12.1:8732"); // non-exist ip, prevent to connect local service
DEFAULT_CONFIG.put(ICEBERG_CATALOG_TYPE, "hive");
DEFAULT_CATALOG_PROPERTIES = new IcebergCatalogProperties(DEFAULT_CONFIG);
}

@BeforeClass
public static void beforeClass() throws Exception {
UtFrameUtils.createMinStarRocksCluster();
AnalyzeTestUtil.init();
}

public IcebergMetadata buildIcebergMetadata(RESTCatalog restCatalog) {
IcebergRESTCatalog icebergRESTCatalog = new IcebergRESTCatalog(restCatalog, new Configuration());
CachingIcebergCatalog cachingIcebergCatalog = new CachingIcebergCatalog(
CATALOG_NAME, icebergRESTCatalog, DEFAULT_CATALOG_PROPERTIES, Executors.newSingleThreadExecutor());

return new IcebergMetadata(CATALOG_NAME, HDFS_ENVIRONMENT, cachingIcebergCatalog,
Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor(),
new IcebergCatalogProperties(DEFAULT_CONFIG));
}

@Test
public void testListAllDatabases(@Mocked RESTCatalog restCatalog) {
new Expectations() {
Expand Down Expand Up @@ -77,4 +130,107 @@ public void testRenameTable(@Mocked RESTCatalog restCatalog) {
boolean exists = icebergRESTCatalog.tableExists("db", "tbl2");
Assert.assertTrue(exists);
}

@Test
public void testShowTableVies(@Mocked RESTCatalog restCatalog) {
IcebergMetadata metadata = buildIcebergMetadata(restCatalog);

new Expectations() {
{
restCatalog.listTables((Namespace) any);
result = ImmutableList.of(TableIdentifier.of("db", "tbl1"));
minTimes = 1;

restCatalog.listViews((Namespace) any);
result = ImmutableList.of(TableIdentifier.of("db", "view1"));
minTimes = 1;
}
};

List<String> tables = metadata.listTableNames("db");
Assert.assertEquals(2, tables.size());
Assert.assertEquals(tables, Lists.newArrayList("tbl1", "view1"));
}

@Test
public void testDropView(@Mocked RESTCatalog restCatalog) {
IcebergMetadata metadata = buildIcebergMetadata(restCatalog);
new MockUp<IcebergMetadata>() {
@Mock
Table getTable(String dbName, String tblName) {
return new IcebergView(1, "iceberg_rest_catalog", "db", "view",
Lists.newArrayList(), "mocked", "iceberg_rest_catalog", "db",
"location");
}
};

new Expectations() {
{
restCatalog.dropView((TableIdentifier) any);
result = true;
minTimes = 1;
}
};

metadata.dropTable(new DropTableStmt(false, new TableName("catalog", "db", "view"), false));
}

@Test
public void testCreateView(@Mocked RESTCatalog restCatalog, @Mocked BaseView baseView,
@Mocked ImmutableSQLViewRepresentation representation) throws Exception {
IcebergMetadata metadata = buildIcebergMetadata(restCatalog);

new Expectations() {
{
restCatalog.loadNamespaceMetadata(Namespace.of("db"));
result = ImmutableMap.of("location", "xxxxx");
minTimes = 1;

restCatalog.name();
result = "rest_catalog";
minTimes = 1;
}
};

CreateViewStmt stmt = new CreateViewStmt(false, false, new TableName("catalog", "db", "table"),
Lists.newArrayList(new ColWithComment("k1", "", NodePosition.ZERO)), "", false, null, NodePosition.ZERO);
stmt.setColumns(Lists.newArrayList(new Column("k1", INT)));
metadata.createView(stmt);

new Expectations() {
{
representation.sql();
result = "select * from table";
minTimes = 1;

baseView.sqlFor("starrocks");
result = representation;
minTimes = 1;

baseView.properties();
result = ImmutableMap.of("comment", "mocked");
minTimes = 1;

baseView.schema();
result = new Schema(Types.NestedField.optional(1, "k1", Types.IntegerType.get()));
minTimes = 1;

baseView.name();
result = "view";
minTimes = 1;

baseView.location();
result = "xxx";
minTimes = 1;

restCatalog.loadView(TableIdentifier.of("db", "view"));
result = baseView;
minTimes = 1;
}
};

Table table = metadata.getView("db", "view");
Assert.assertEquals(ICEBERG_VIEW, table.getType());
Assert.assertEquals("xxx", table.getTableLocation());
}
}

0 comments on commit 37e0b2a

Please sign in to comment.