From ea5da1789f3eb80fc3196bcdd787a95ac0c493ba Mon Sep 17 00:00:00 2001 From: Ozan Okumusoglu Date: Fri, 18 Oct 2024 16:48:00 -0700 Subject: [PATCH] AWS: Switch to base2 entropy in ObjectStoreLocationProvider for optimized S3 performance (#11112) Co-authored-by: Drew Schleit --- .../org/apache/iceberg/LocationProviders.java | 69 ++++++++++++++++--- .../org/apache/iceberg/TableProperties.java | 5 ++ .../apache/iceberg/TestLocationProvider.java | 40 +++++++++-- docs/docs/aws.md | 26 +++++-- docs/docs/configuration.md | 1 + 5 files changed, 123 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/LocationProviders.java b/core/src/main/java/org/apache/iceberg/LocationProviders.java index 0c4924b6d5a6..68bec2f4e4fc 100644 --- a/core/src/main/java/org/apache/iceberg/LocationProviders.java +++ b/core/src/main/java/org/apache/iceberg/LocationProviders.java @@ -27,7 +27,6 @@ import org.apache.iceberg.relocated.com.google.common.hash.HashCode; import org.apache.iceberg.relocated.com.google.common.hash.HashFunction; import org.apache.iceberg.relocated.com.google.common.hash.Hashing; -import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding; import org.apache.iceberg.util.LocationUtil; import org.apache.iceberg.util.PropertyUtil; @@ -108,10 +107,15 @@ public String newDataLocation(String filename) { static class ObjectStoreLocationProvider implements LocationProvider { private static final HashFunction HASH_FUNC = Hashing.murmur3_32_fixed(); - private static final BaseEncoding BASE64_ENCODER = BaseEncoding.base64Url().omitPadding(); - private static final ThreadLocal TEMP = ThreadLocal.withInitial(() -> new byte[4]); + // Length of entropy generated in the file location + private static final int HASH_BINARY_STRING_BITS = 20; + // Entropy generated will be divided into dirs with this lengths + private static final int ENTROPY_DIR_LENGTH = 4; + // Will create DEPTH many dirs from the entropy + private static final int ENTROPY_DIR_DEPTH = 3; private final String storageLocation; private final String context; + private final boolean includePartitionPaths; ObjectStoreLocationProvider(String tableLocation, Map properties) { this.storageLocation = @@ -123,6 +127,11 @@ static class ObjectStoreLocationProvider implements LocationProvider { } else { this.context = pathContext(tableLocation); } + this.includePartitionPaths = + PropertyUtil.propertyAsBoolean( + properties, + TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS, + TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT); } private static String dataLocation(Map properties, String tableLocation) { @@ -141,7 +150,12 @@ private static String dataLocation(Map properties, String tableL @Override public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) { - return newDataLocation(String.format("%s/%s", spec.partitionToPath(partitionData), filename)); + if (includePartitionPaths) { + return newDataLocation( + String.format("%s/%s", spec.partitionToPath(partitionData), filename)); + } else { + return newDataLocation(filename); + } } @Override @@ -150,7 +164,13 @@ public String newDataLocation(String filename) { if (context != null) { return String.format("%s/%s/%s/%s", storageLocation, hash, context, filename); } else { - return String.format("%s/%s/%s", storageLocation, hash, filename); + // if partition paths are included, add last part of entropy as dir before partition names + if (includePartitionPaths) { + return String.format("%s/%s/%s", storageLocation, hash, filename); + } else { + // if partition paths are not included, append last part of entropy with `-` to file name + return String.format("%s/%s-%s", storageLocation, hash, filename); + } } } @@ -172,10 +192,41 @@ private static String pathContext(String tableLocation) { } private String computeHash(String fileName) { - byte[] bytes = TEMP.get(); - HashCode hash = HASH_FUNC.hashString(fileName, StandardCharsets.UTF_8); - hash.writeBytesTo(bytes, 0, 4); - return BASE64_ENCODER.encode(bytes); + HashCode hashCode = HASH_FUNC.hashString(fileName, StandardCharsets.UTF_8); + + // {@link Integer#toBinaryString} excludes leading zeros, which we want to preserve. + // force the first bit to be set to get around that. + String hashAsBinaryString = Integer.toBinaryString(hashCode.asInt() | Integer.MIN_VALUE); + // Limit hash length to HASH_BINARY_STRING_BITS + String hash = + hashAsBinaryString.substring(hashAsBinaryString.length() - HASH_BINARY_STRING_BITS); + return dirsFromHash(hash); + } + + /** + * Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH + * and ENTROPY_DIR_LENGTH + * + * @param hash 10011001100110011001 + * @return 1001/1001/1001/10011001 with depth 3 and length 4 + */ + private String dirsFromHash(String hash) { + StringBuilder hashWithDirs = new StringBuilder(); + + for (int i = 0; i < ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH; i += ENTROPY_DIR_LENGTH) { + if (i > 0) { + hashWithDirs.append("/"); + } + hashWithDirs.append(hash, i, Math.min(i + ENTROPY_DIR_LENGTH, hash.length())); + } + + if (hash.length() > ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH) { + hashWithDirs + .append("/") + .append(hash, ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH, hash.length()); + } + + return hashWithDirs.toString(); } } } diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 1652e9312f50..c137bcd3a2c3 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -244,6 +244,11 @@ private TableProperties() {} public static final String OBJECT_STORE_ENABLED = "write.object-storage.enabled"; public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false; + // Excludes the partition values in the path when set to true and object store is enabled + public static final String WRITE_OBJECT_STORE_PARTITIONED_PATHS = + "write.object-storage.partitioned-paths"; + public static final boolean WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = true; + /** * @deprecated Use {@link #WRITE_DATA_LOCATION} instead. */ diff --git a/core/src/test/java/org/apache/iceberg/TestLocationProvider.java b/core/src/test/java/org/apache/iceberg/TestLocationProvider.java index 7afb69483490..7edba51c3d85 100644 --- a/core/src/test/java/org/apache/iceberg/TestLocationProvider.java +++ b/core/src/test/java/org/apache/iceberg/TestLocationProvider.java @@ -240,7 +240,6 @@ public void testObjectStorageLocationProviderPathResolution() { String dataPath = "s3://random/data/location"; table.updateProperties().set(TableProperties.WRITE_DATA_LOCATION, dataPath).commit(); - assertThat(table.locationProvider().newDataLocation("file")) .as("write data path should be used when set") .contains(dataPath); @@ -279,12 +278,12 @@ public void testObjectStorageWithinTableLocation() { String fileLocation = table.locationProvider().newDataLocation("test.parquet"); String relativeLocation = fileLocation.replaceFirst(table.location(), ""); List parts = Splitter.on("/").splitToList(relativeLocation); - - assertThat(parts).hasSize(4); + assertThat(parts).hasSize(7); assertThat(parts).first().asString().isEmpty(); assertThat(parts).element(1).asString().isEqualTo("data"); - assertThat(parts).element(2).asString().isNotEmpty(); - assertThat(parts).element(3).asString().isEqualTo("test.parquet"); + // entropy dirs in the middle + assertThat(parts).elements(2, 3, 4, 5).asString().isNotEmpty(); + assertThat(parts).element(6).asString().isEqualTo("test.parquet"); } @TestTemplate @@ -304,4 +303,35 @@ public void testEncodedFieldNameInPartitionPath() { assertThat(partitionString).isEqualTo("data%231=val%231"); } + + @TestTemplate + public void testExcludePartitionInPath() { + // Update the table to use a string field for partitioning with special characters in the name + table.updateProperties().set(TableProperties.OBJECT_STORE_ENABLED, "true").commit(); + table + .updateProperties() + .set(TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS, "false") + .commit(); + + // Use a partition value that has a special character + StructLike partitionData = TestHelpers.CustomRow.of(0, "val"); + String fileLocation = + table.locationProvider().newDataLocation(table.spec(), partitionData, "test.parquet"); + + // no partition values included in the path and last part of entropy is seperated with "-" + assertThat(fileLocation).endsWith("/data/0110/1010/0011/11101000-test.parquet"); + } + + @TestTemplate + public void testHashInjection() { + table.updateProperties().set(TableProperties.OBJECT_STORE_ENABLED, "true").commit(); + assertThat(table.locationProvider().newDataLocation("a")) + .endsWith("/data/0101/0110/1001/10110010/a"); + assertThat(table.locationProvider().newDataLocation("b")) + .endsWith("/data/1110/0111/1110/00000011/b"); + assertThat(table.locationProvider().newDataLocation("c")) + .endsWith("/data/0010/1101/0110/01011111/c"); + assertThat(table.locationProvider().newDataLocation("d")) + .endsWith("/data/1001/0001/0100/01110011/d"); + } } diff --git a/docs/docs/aws.md b/docs/docs/aws.md index e408cb5a2ae4..1a98a4d18e5b 100644 --- a/docs/docs/aws.md +++ b/docs/docs/aws.md @@ -343,7 +343,10 @@ Data stored in S3 with a traditional Hive storage layout can face S3 request thr Iceberg by default uses the Hive storage layout but can be switched to use the `ObjectStoreLocationProvider`. With `ObjectStoreLocationProvider`, a deterministic hash is generated for each stored file, with the hash appended -directly after the `write.data.path`. This ensures files written to s3 are equally distributed across multiple [prefixes](https://aws.amazon.com/premiumsupport/knowledge-center/s3-object-key-naming-pattern/) in the S3 bucket. Resulting in minimized throttling and maximized throughput for S3-related IO operations. When using `ObjectStoreLocationProvider` having a shared and short `write.data.path` across your Iceberg tables will improve performance. +directly after the `write.data.path`. This ensures files written to S3 are equally distributed across multiple +[prefixes](https://aws.amazon.com/premiumsupport/knowledge-center/s3-object-key-naming-pattern/) in the S3 bucket; +resulting in minimized throttling and maximized throughput for S3-related IO operations. When using `ObjectStoreLocationProvider` +having a shared `write.data.path` across your Iceberg tables will improve performance. For more information on how S3 scales API QPS, check out the 2018 re:Invent session on [Best Practices for Amazon S3 and Amazon S3 Glacier](https://youtu.be/rHeTn9pHNKo?t=3219). At [53:39](https://youtu.be/rHeTn9pHNKo?t=3219) it covers how S3 scales/partitions & at [54:50](https://youtu.be/rHeTn9pHNKo?t=3290) it discusses the 30-60 minute wait time before new partitions are created. @@ -357,7 +360,7 @@ CREATE TABLE my_catalog.my_ns.my_table ( USING iceberg OPTIONS ( 'write.object-storage.enabled'=true, - 'write.data.path'='s3://my-table-data-bucket') + 'write.data.path'='s3://my-table-data-bucket/my_table') PARTITIONED BY (category); ``` @@ -366,9 +369,16 @@ We can then insert a single row into this new table INSERT INTO my_catalog.my_ns.my_table VALUES (1, "Pizza", "orders"); ``` -Which will write the data to S3 with a hash (`2d3905f8`) appended directly after the `write.object-storage.path`, ensuring reads to the table are spread evenly across [S3 bucket prefixes](https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html), and improving performance. +Which will write the data to S3 with a 20-bit base2 hash (`01010110100110110010`) appended directly after the `write.object-storage.path`, +ensuring reads to the table are spread evenly across [S3 bucket prefixes](https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html), and improving performance. +Previously provided base64 hash was updated to base2 in order to provide an improved auto-scaling behavior on S3 General Purpose Buckets. + +As part of this update, we have also divided the entropy into multiple directories in order to improve the efficiency of the +orphan clean up process for Iceberg since directories are used as a mean to divide the work across workers for faster traversal. You +can see from the example below that we divide the hash to create 4-bit directories with a depth of 3 and attach the final part of the hash to +the end. ``` -s3://my-table-data-bucket/2d3905f8/my_ns.db/my_table/category=orders/00000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet +s3://my-table-data-bucket/my_ns.db/my_table/0101/0110/1001/10110010/category=orders/00000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet ``` Note, the path resolution logic for `ObjectStoreLocationProvider` is `write.data.path` then `/data`. @@ -378,6 +388,14 @@ However, for the older versions up to 0.12.0, the logic is as follows: For more details, please refer to the [LocationProvider Configuration](custom-catalog.md#custom-location-provider-implementation) section. +We have also added a new table property `write.object-storage.partitioned-paths` that if set to false(default=true), this will +omit the partition values from the file path. Iceberg does not need these values in the file path and setting this value to false +can further reduce the key size. In this case, we also append the final 8 bit of entropy directly to the file name. +Inserted key would look like the following with this config set, note that `category=orders` is removed: +``` +s3://my-table-data-bucket/my_ns.db/my_table/1101/0100/1011/00111010-00000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet +``` + ### S3 Retries Workloads which encounter S3 throttling should persistently retry, with exponential backoff, to make progress while S3 diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index 264b9edfa7cc..07a98fd94515 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -77,6 +77,7 @@ Iceberg tables support table properties to configure table behavior, like the de | write.metadata.previous-versions-max | 100 | The max number of previous version metadata files to keep before deleting after commit | | write.spark.fanout.enabled | false | Enables the fanout writer in Spark that does not require data to be clustered; uses more memory | | write.object-storage.enabled | false | Enables the object storage location provider that adds a hash component to file paths | +| write.object-storage.partitioned-paths | true | Includes the partition values in the file path | | write.data.path | table location + /data | Base location for data files | | write.metadata.path | table location + /metadata | Base location for metadata files | | write.delete.mode | copy-on-write | Mode used for delete commands: copy-on-write or merge-on-read (v2 only) |