Skip to content

Commit

Permalink
Add Support for Iceberg table sort orders
Browse files Browse the repository at this point in the history
  • Loading branch information
evanvdia committed Jan 11, 2025
1 parent 5d63246 commit 49b0b77
Show file tree
Hide file tree
Showing 34 changed files with 1,235 additions and 86 deletions.
42 changes: 41 additions & 1 deletion presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ Property Name Description

Example: ``hdfs://nn:8020/warehouse/path``
This property is required if the ``iceberg.catalog.type`` is
``hadoop``.
``hadoop``. Otherwise, it will be ignored.

``iceberg.catalog.cached-catalog-num`` The number of Iceberg catalogs to cache. This property is ``10``
required if the ``iceberg.catalog.type`` is ``hadoop``.
Expand Down Expand Up @@ -1833,3 +1833,43 @@ Map of PrestoDB types to the relevant Iceberg types:


No other types are supported.


Sorted Tables
^^^^^^^^^^^^^

The Iceberg connector supports the creation of sorted tables.
Data in the Iceberg table is sorted as each file is written.

Sorted Iceberg tables can decrease query times in many cases; however, it will depend on the query shape and cluster configuration.
Sorting is particularly beneficial when the sorted columns have a
high cardinality and are used as a filter for selective reads.

Configure sort order with the ``sorted_by`` table property to specify an array of
one or more columns to use for sorting.
The following example creates the table with the ``sorted_by`` property, and sorts the file based
on the field ``join_date``.

.. code-block:: text

CREATE TABLE emp.employees.employee (
emp_id BIGINT,
emp_name VARCHAR,
join_date DATE,
country VARCHAR)
WITH (
sorted_by = ARRAY['join_date']
)

Sorting can be combined with partitioning on the same column. For example::

CREATE TABLE emp.employees.employee (
emp_id BIGINT,
emp_name VARCHAR,
join_date DATE,
country VARCHAR)
WITH (
partitioning = ARRAY['month(join_date)'],
sorted_by = ARRAY['join_date']
)

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -279,20 +278,6 @@ public HiveClientConfig setDomainCompactionThreshold(int domainCompactionThresho
return this;
}

@MinDataSize("1MB")
@MaxDataSize("1GB")
public DataSize getWriterSortBufferSize()
{
return writerSortBufferSize;
}

@Config("hive.writer-sort-buffer-size")
public HiveClientConfig setWriterSortBufferSize(DataSize writerSortBufferSize)
{
this.writerSortBufferSize = writerSortBufferSize;
return this;
}

@Min(1)
public int getMaxConcurrentFileRenames()
{
Expand Down Expand Up @@ -695,22 +680,6 @@ public HiveClientConfig setMaxPartitionsPerWriter(int maxPartitionsPerWriter)
this.maxPartitionsPerWriter = maxPartitionsPerWriter;
return this;
}

@Min(2)
@Max(1000)
public int getMaxOpenSortFiles()
{
return maxOpenSortFiles;
}

@Config("hive.max-open-sort-files")
@ConfigDescription("Maximum number of writer temporary files to read in one pass")
public HiveClientConfig setMaxOpenSortFiles(int maxOpenSortFiles)
{
this.maxOpenSortFiles = maxOpenSortFiles;
return this;
}

public int getWriteValidationThreads()
{
return writeValidationThreads;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void configure(Binder binder)
binder.bind(HdfsConfigurationInitializer.class).in(Scopes.SINGLETON);
newSetBinder(binder, DynamicConfigurationProvider.class);
configBinder(binder).bindConfig(HiveClientConfig.class);

configBinder(binder).bindConfig(SortingFileWriterConfig.class);
binder.bind(HiveSessionProperties.class).in(Scopes.SINGLETON);
binder.bind(HiveTableProperties.class).in(Scopes.SINGLETON);
binder.bind(HiveAnalyzeProperties.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public HivePageSinkProvider(
TypeManager typeManager,
HiveClientConfig hiveClientConfig,
MetastoreClientConfig metastoreClientConfig,
SortingFileWriterConfig sortingFileWriterConfig,
LocationService locationService,
JsonCodec<PartitionUpdate> partitionUpdateCodec,
SmileCodec<PartitionUpdate> partitionUpdateSmileCodec,
Expand All @@ -110,8 +111,8 @@ public HivePageSinkProvider(
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.maxOpenPartitions = hiveClientConfig.getMaxPartitionsPerWriter();
this.maxOpenSortFiles = hiveClientConfig.getMaxOpenSortFiles();
this.writerSortBufferSize = requireNonNull(hiveClientConfig.getWriterSortBufferSize(), "writerSortBufferSize is null");
this.maxOpenSortFiles = sortingFileWriterConfig.getMaxOpenSortFiles();
this.writerSortBufferSize = requireNonNull(sortingFileWriterConfig.getWriterSortBufferSize(), "writerSortBufferSize is null");
this.immutablePartitions = hiveClientConfig.isImmutablePartitions();
this.locationService = requireNonNull(locationService, "locationService is null");
this.writeVerificationExecutor = listeningDecorator(newFixedThreadPool(hiveClientConfig.getWriteValidationThreads(), daemonThreadsNamed("hive-write-validation-%s")));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.hive;

import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import io.airlift.units.DataSize;
import io.airlift.units.MaxDataSize;
import io.airlift.units.MinDataSize;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;

import static io.airlift.units.DataSize.Unit.MEGABYTE;

public class SortingFileWriterConfig
{
private DataSize writerSortBufferSize = new DataSize(64, MEGABYTE);
private int maxOpenSortFiles = 50;

@MinDataSize("1MB")
@MaxDataSize("1GB")
public DataSize getWriterSortBufferSize()
{
return writerSortBufferSize;
}

@Config("writer-sort-buffer-size")
@ConfigDescription("Defines how much memory is used for this in-memory sorting process.")
public SortingFileWriterConfig setWriterSortBufferSize(DataSize writerSortBufferSize)
{
this.writerSortBufferSize = writerSortBufferSize;
return this;
}

@Min(2)
@Max(1000)
public int getMaxOpenSortFiles()
{
return maxOpenSortFiles;
}

@Config("max-open-sort-files")
@ConfigDescription("When writing, the maximum number of temporary files opened at one time to write sorted data.")
public SortingFileWriterConfig setMaxOpenSortFiles(int maxOpenSortFiles)
{
this.maxOpenSortFiles = maxOpenSortFiles;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,7 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
FUNCTION_AND_TYPE_MANAGER,
getHiveClientConfig(),
getMetastoreClientConfig(),
getSortingFileWriterConfig(),
locationService,
HiveTestUtils.PARTITION_UPDATE_CODEC,
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,
Expand All @@ -1099,12 +1100,17 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
protected HiveClientConfig getHiveClientConfig()
{
return new HiveClientConfig()
.setMaxOpenSortFiles(10)
.setWriterSortBufferSize(new DataSize(100, KILOBYTE))
.setTemporaryTableSchema(database)
.setCreateEmptyBucketFilesForTemporaryTable(false);
}

protected SortingFileWriterConfig getSortingFileWriterConfig()
{
return new SortingFileWriterConfig()
.setMaxOpenSortFiles(10)
.setWriterSortBufferSize(new DataSize(100, KILOBYTE));
}

protected HiveCommonClientConfig getHiveCommonClientConfig()
{
return new HiveCommonClientConfig();
Expand Down Expand Up @@ -3109,7 +3115,7 @@ private void doTestBucketSortedTables(SchemaTableName table, boolean useTempPath
true);
assertThat(listAllDataFiles(context, path))
.filteredOn(file -> file.contains(".tmp-sort"))
.size().isGreaterThan(bucketCount * getHiveClientConfig().getMaxOpenSortFiles() * 2);
.size().isGreaterThan(bucketCount * getSortingFileWriterConfig().getMaxOpenSortFiles() * 2);

// finish the write
Collection<Slice> fragments = getFutureValue(sink.finish());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ protected void setup(String host, int port, String databaseName, BiFunction<Hive
FUNCTION_AND_TYPE_MANAGER,
config,
metastoreClientConfig,
new SortingFileWriterConfig(),
locationService,
HiveTestUtils.PARTITION_UPDATE_CODEC,
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public void testDefaults()
.setMaxInitialSplitSize(new DataSize(32, Unit.MEGABYTE))
.setSplitLoaderConcurrency(4)
.setDomainCompactionThreshold(100)
.setWriterSortBufferSize(new DataSize(64, Unit.MEGABYTE))
.setMaxConcurrentFileRenames(20)
.setMaxConcurrentZeroRowFileCreations(20)
.setRecursiveDirWalkerEnabled(false)
Expand All @@ -82,7 +81,6 @@ public void testDefaults()
.setFailFastOnInsertIntoImmutablePartitionsEnabled(true)
.setSortedWritingEnabled(true)
.setMaxPartitionsPerWriter(100)
.setMaxOpenSortFiles(50)
.setWriteValidationThreads(16)
.setTextMaxLineLength(new DataSize(100, Unit.MEGABYTE))
.setUseOrcColumnNames(false)
Expand Down Expand Up @@ -195,7 +193,6 @@ public void testExplicitPropertyMappings()
.put("hive.max-initial-split-size", "16MB")
.put("hive.split-loader-concurrency", "1")
.put("hive.domain-compaction-threshold", "42")
.put("hive.writer-sort-buffer-size", "13MB")
.put("hive.recursive-directories", "true")
.put("hive.storage-format", "SEQUENCEFILE")
.put("hive.compression-codec", "NONE")
Expand All @@ -207,7 +204,6 @@ public void testExplicitPropertyMappings()
.put("hive.insert-overwrite-immutable-partitions-enabled", "true")
.put("hive.fail-fast-on-insert-into-immutable-partitions-enabled", "false")
.put("hive.max-partitions-per-writers", "222")
.put("hive.max-open-sort-files", "333")
.put("hive.write-validation-threads", "11")
.put("hive.max-concurrent-file-renames", "100")
.put("hive.max-concurrent-zero-row-file-creations", "100")
Expand Down Expand Up @@ -313,7 +309,6 @@ public void testExplicitPropertyMappings()
.setMaxInitialSplitSize(new DataSize(16, Unit.MEGABYTE))
.setSplitLoaderConcurrency(1)
.setDomainCompactionThreshold(42)
.setWriterSortBufferSize(new DataSize(13, Unit.MEGABYTE))
.setMaxConcurrentFileRenames(100)
.setMaxConcurrentZeroRowFileCreations(100)
.setRecursiveDirWalkerEnabled(true)
Expand All @@ -331,7 +326,6 @@ public void testExplicitPropertyMappings()
.setInsertOverwriteImmutablePartitionEnabled(true)
.setFailFastOnInsertIntoImmutablePartitionsEnabled(false)
.setMaxPartitionsPerWriter(222)
.setMaxOpenSortFiles(333)
.setWriteValidationThreads(11)
.setDomainSocketPath("/foo")
.setS3FileSystemType(S3FileSystemType.EMRFS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,22 +114,23 @@ public void testAllFormats()
throws Exception
{
HiveClientConfig config = new HiveClientConfig();
SortingFileWriterConfig sortingFileWriterConfig = new SortingFileWriterConfig();
MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
File tempDir = Files.createTempDir();
try {
ExtendedHiveMetastore metastore = createTestingFileHiveMetastore(new File(tempDir, "metastore"));
for (HiveStorageFormat format : getSupportedHiveStorageFormats()) {
config.setHiveStorageFormat(format);
config.setCompressionCodec(NONE);
long uncompressedLength = writeTestFile(config, metastoreClientConfig, metastore, makeFileName(tempDir, config));
long uncompressedLength = writeTestFile(config, metastoreClientConfig, metastore, makeFileName(tempDir, config), sortingFileWriterConfig);
assertGreaterThan(uncompressedLength, 0L);

for (HiveCompressionCodec codec : HiveCompressionCodec.values()) {
if (codec == NONE || !codec.isSupportedStorageFormat(format)) {
continue;
}
config.setCompressionCodec(codec);
long length = writeTestFile(config, metastoreClientConfig, metastore, makeFileName(tempDir, config));
long length = writeTestFile(config, metastoreClientConfig, metastore, makeFileName(tempDir, config), sortingFileWriterConfig);
assertTrue(uncompressedLength > length, format("%s with %s compressed to %s which is not less than %s", format, codec, length, uncompressedLength));
}
}
Expand All @@ -152,11 +153,11 @@ private static String makeFileName(File tempDir, HiveClientConfig config)
return tempDir.getAbsolutePath() + "/" + config.getHiveStorageFormat().name() + "." + config.getCompressionCodec().name();
}

private static long writeTestFile(HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, String outputPath)
private static long writeTestFile(HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, String outputPath, SortingFileWriterConfig sortingFileWriterConfig)
{
HiveTransactionHandle transaction = new HiveTransactionHandle();
HiveWriterStats stats = new HiveWriterStats();
ConnectorPageSink pageSink = createPageSink(transaction, config, metastoreClientConfig, metastore, new Path("file:///" + outputPath), stats);
ConnectorPageSink pageSink = createPageSink(transaction, config, metastoreClientConfig, metastore, new Path("file:///" + outputPath), stats, sortingFileWriterConfig);
List<LineItemColumn> columns = getTestColumns();
List<Type> columnTypes = columns.stream()
.map(LineItemColumn::getType)
Expand Down Expand Up @@ -308,7 +309,7 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa
return provider.createPageSource(transaction, getSession(config, new HiveCommonClientConfig()), split, tableHandle.getLayout().get(), ImmutableList.copyOf(getColumnHandles()), NON_CACHEABLE, new RuntimeStats());
}

private static ConnectorPageSink createPageSink(HiveTransactionHandle transaction, HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, Path outputPath, HiveWriterStats stats)
private static ConnectorPageSink createPageSink(HiveTransactionHandle transaction, HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, Path outputPath, HiveWriterStats stats, SortingFileWriterConfig sortingFileWriterConfig)
{
LocationHandle locationHandle = new LocationHandle(outputPath, outputPath, Optional.empty(), NEW, DIRECT_TO_TARGET_NEW_DIRECTORY);
HiveOutputTableHandle handle = new HiveOutputTableHandle(
Expand Down Expand Up @@ -337,6 +338,7 @@ private static ConnectorPageSink createPageSink(HiveTransactionHandle transactio
FUNCTION_AND_TYPE_MANAGER,
config,
metastoreClientConfig,
sortingFileWriterConfig,
new HiveLocationService(hdfsEnvironment),
HiveTestUtils.PARTITION_UPDATE_CODEC,
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import org.testng.annotations.Test;

import java.util.Map;

import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;

public class TestSortingFileWriterConfig
{
@Test
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(SortingFileWriterConfig.class)
.setWriterSortBufferSize(new DataSize(64, MEGABYTE))
.setMaxOpenSortFiles(50));
}

@Test
public void testExplicitPropertyMappings()
{
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("writer-sort-buffer-size", "1GB")
.put("max-open-sort-files", "3")
.build();
SortingFileWriterConfig expected = new SortingFileWriterConfig()
.setWriterSortBufferSize(new DataSize(1, GIGABYTE))
.setMaxOpenSortFiles(3);
assertFullMapping(properties, expected);
}
}
Loading

0 comments on commit 49b0b77

Please sign in to comment.