diff --git a/core/src/main/java/org/apache/iceberg/CachingCatalog.java b/core/src/main/java/org/apache/iceberg/CachingCatalog.java index 1043e3e7205c..d46832b5f64c 100644 --- a/core/src/main/java/org/apache/iceberg/CachingCatalog.java +++ b/core/src/main/java/org/apache/iceberg/CachingCatalog.java @@ -60,6 +60,10 @@ public static Catalog wrap( return new CachingCatalog(catalog, caseSensitive, expirationIntervalMillis); } + public boolean wrapped_is_instance(Class cls) { + return cls.isInstance(catalog); + } + private final Catalog catalog; private final boolean caseSensitive; diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index 339c59b45d1b..81777275cfac 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -78,6 +78,14 @@ private CatalogProperties() {} public static final boolean IO_MANIFEST_CACHE_ENABLED_DEFAULT = false; + /** + * Controls whether engines using a REST Catalog should delegate the drop table purge requests to the Catalog. + * Defaults to false, allowing the engine to use its own implementation for purging. + */ + public static final String REST_CATALOG_PURGE = "rest.catalog-purge"; + + public static final boolean REST_CATALOG_PURGE_DEFAULT = false; + /** * Controls the maximum duration for which an entry stays in the manifest cache. * diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 0c361598623e..cfb10291df50 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark; +import static org.apache.iceberg.CatalogProperties.REST_CATALOG_PURGE; +import static org.apache.iceberg.CatalogProperties.REST_CATALOG_PURGE_DEFAULT; import static org.apache.iceberg.TableProperties.GC_ENABLED; import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT; @@ -60,6 +62,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.rest.RESTSessionCatalog; import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.spark.source.SparkChangelogTable; import org.apache.iceberg.spark.source.SparkTable; @@ -137,6 +141,7 @@ public class SparkCatalog extends BaseCatalog private ViewCatalog asViewCatalog = null; private String[] defaultNamespace = null; private HadoopTables tables; + private boolean restCatalogPurge; /** * Build an Iceberg {@link Catalog} to be used by this Spark catalog adapter. @@ -365,6 +370,14 @@ public boolean purgeTable(Identifier ident) { String metadataFileLocation = ((HasTableOperations) table).operations().current().metadataFileLocation(); + if ((this.icebergCatalog instanceof RESTCatalog + || this.icebergCatalog instanceof RESTSessionCatalog + || (this.icebergCatalog instanceof CachingCatalog + && ((CachingCatalog) this.icebergCatalog).wrapped_is_instance(RESTCatalog.class))) + && this.restCatalogPurge) { + return dropTableWithPurging(ident); + } + boolean dropped = dropTableWithoutPurging(ident); if (dropped) { @@ -383,6 +396,14 @@ public boolean purgeTable(Identifier ident) { } } + private boolean dropTableWithPurging(Identifier ident) { + if (isPathIdentifier(ident)) { + return tables.dropTable(((PathIdentifier) ident).location(), true); + } else { + return icebergCatalog.dropTable(buildIdentifier(ident), true); + } + } + private boolean dropTableWithoutPurging(Identifier ident) { if (isPathIdentifier(ident)) { return tables.dropTable(((PathIdentifier) ident).location(), false /* don't purge data */); @@ -744,6 +765,9 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS, CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT); + this.restCatalogPurge = + PropertyUtil.propertyAsBoolean(options, REST_CATALOG_PURGE, REST_CATALOG_PURGE_DEFAULT); + // An expiration interval of 0ms effectively disables caching. // Do not wrap with CachingCatalog. if (cacheExpirationIntervalMs == 0) { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestRestDropPurgeTable.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestRestDropPurgeTable.java new file mode 100644 index 000000000000..5cb27cfe858f --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestRestDropPurgeTable.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL; +import static org.apache.iceberg.CatalogProperties.REST_CATALOG_PURGE; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashMap; +import java.util.Map; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.TestBase; +import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.connector.catalog.Column; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestRestDropPurgeTable extends TestBase { + + private SparkCatalog sparkCatalog; + private static final Identifier id = Identifier.of(new String[] {"a"}, "db"); + + @Parameter(index = 0) + private boolean purge; + + @Parameters(name = "purge = {0}") + protected static Object[][] parameters() { + return new Object[][] {{true}, {false}}; + } + + @BeforeEach + public void before() { + sparkCatalog = new SparkCatalog(); + sparkCatalog.initialize( + "test_rest_catalog_purge", + new CaseInsensitiveStringMap( + ImmutableMap.of( + CATALOG_IMPL, + MockCatalog.class.getName(), + REST_CATALOG_PURGE, + Boolean.toString(purge), + MockCatalog.EXPECTATION, + Boolean.toString(purge)))); + try { + sparkCatalog.createNamespace(new String[] {"a"}, ImmutableMap.of()); + sparkCatalog.createTable( + id, new Column[] {Column.create("a", DataTypes.FloatType)}, null, ImmutableMap.of()); + + } catch (TableAlreadyExistsException + | NoSuchNamespaceException + | NamespaceAlreadyExistsException e) { + throw new RuntimeException(e); + } + } + + @TestTemplate + public void testDropPurgeTableForwardsPurge() { + boolean dropped = sparkCatalog.purgeTable(id); + // we assert that dropped is True to make sure the code path in purgeTable we're testing + // actually ran + assertThat(dropped).isTrue(); + } + + public static class MockCatalog extends RESTCatalog { + private final InMemoryCatalog catalog; + private static final String EXPECTATION = "expectation"; + private boolean expectation; + + public MockCatalog() { + this.catalog = new InMemoryCatalog(); + this.catalog.initialize("test", new HashMap<>()); + } + + @Override + public void initialize(String name, Map props) { + this.expectation = Boolean.parseBoolean(props.get(EXPECTATION)); + } + + @Override + public Table createTable(TableIdentifier identifier, org.apache.iceberg.Schema schema) { + return catalog.createTable(identifier, schema); + } + + @Override + public void createNamespace(Namespace ns, Map props) { + catalog.createNamespace(ns, props); + } + + @Override + public TableBuilder buildTable(TableIdentifier identifier, Schema schema) { + return catalog.buildTable(identifier, schema); + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + assertThat(purge).isEqualTo(expectation); + return true; + } + + @Override + public Table loadTable(TableIdentifier identifier) { + return catalog.loadTable(identifier); + } + } +}