Skip to content

Commit

Permalink
AWS: Switch to base2 entropy in ObjectStoreLocationProvider for optim…
Browse files Browse the repository at this point in the history
…ized S3 performance (#11112)

Co-authored-by: Drew Schleit <[email protected]>
  • Loading branch information
ookumuso and drewschleit authored Oct 18, 2024
1 parent d61a98d commit ea5da17
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 18 deletions.
69 changes: 60 additions & 9 deletions core/src/main/java/org/apache/iceberg/LocationProviders.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.iceberg.relocated.com.google.common.hash.HashCode;
import org.apache.iceberg.relocated.com.google.common.hash.HashFunction;
import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.PropertyUtil;

Expand Down Expand Up @@ -108,10 +107,15 @@ public String newDataLocation(String filename) {
static class ObjectStoreLocationProvider implements LocationProvider {

private static final HashFunction HASH_FUNC = Hashing.murmur3_32_fixed();
private static final BaseEncoding BASE64_ENCODER = BaseEncoding.base64Url().omitPadding();
private static final ThreadLocal<byte[]> TEMP = ThreadLocal.withInitial(() -> new byte[4]);
// Length of entropy generated in the file location
private static final int HASH_BINARY_STRING_BITS = 20;
// Entropy generated will be divided into dirs with this lengths
private static final int ENTROPY_DIR_LENGTH = 4;
// Will create DEPTH many dirs from the entropy
private static final int ENTROPY_DIR_DEPTH = 3;
private final String storageLocation;
private final String context;
private final boolean includePartitionPaths;

ObjectStoreLocationProvider(String tableLocation, Map<String, String> properties) {
this.storageLocation =
Expand All @@ -123,6 +127,11 @@ static class ObjectStoreLocationProvider implements LocationProvider {
} else {
this.context = pathContext(tableLocation);
}
this.includePartitionPaths =
PropertyUtil.propertyAsBoolean(
properties,
TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS,
TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT);
}

private static String dataLocation(Map<String, String> properties, String tableLocation) {
Expand All @@ -141,7 +150,12 @@ private static String dataLocation(Map<String, String> properties, String tableL

@Override
public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) {
return newDataLocation(String.format("%s/%s", spec.partitionToPath(partitionData), filename));
if (includePartitionPaths) {
return newDataLocation(
String.format("%s/%s", spec.partitionToPath(partitionData), filename));
} else {
return newDataLocation(filename);
}
}

@Override
Expand All @@ -150,7 +164,13 @@ public String newDataLocation(String filename) {
if (context != null) {
return String.format("%s/%s/%s/%s", storageLocation, hash, context, filename);
} else {
return String.format("%s/%s/%s", storageLocation, hash, filename);
// if partition paths are included, add last part of entropy as dir before partition names
if (includePartitionPaths) {
return String.format("%s/%s/%s", storageLocation, hash, filename);
} else {
// if partition paths are not included, append last part of entropy with `-` to file name
return String.format("%s/%s-%s", storageLocation, hash, filename);
}
}
}

Expand All @@ -172,10 +192,41 @@ private static String pathContext(String tableLocation) {
}

private String computeHash(String fileName) {
byte[] bytes = TEMP.get();
HashCode hash = HASH_FUNC.hashString(fileName, StandardCharsets.UTF_8);
hash.writeBytesTo(bytes, 0, 4);
return BASE64_ENCODER.encode(bytes);
HashCode hashCode = HASH_FUNC.hashString(fileName, StandardCharsets.UTF_8);

// {@link Integer#toBinaryString} excludes leading zeros, which we want to preserve.
// force the first bit to be set to get around that.
String hashAsBinaryString = Integer.toBinaryString(hashCode.asInt() | Integer.MIN_VALUE);
// Limit hash length to HASH_BINARY_STRING_BITS
String hash =
hashAsBinaryString.substring(hashAsBinaryString.length() - HASH_BINARY_STRING_BITS);
return dirsFromHash(hash);
}

/**
* Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH
* and ENTROPY_DIR_LENGTH
*
* @param hash 10011001100110011001
* @return 1001/1001/1001/10011001 with depth 3 and length 4
*/
private String dirsFromHash(String hash) {
StringBuilder hashWithDirs = new StringBuilder();

for (int i = 0; i < ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH; i += ENTROPY_DIR_LENGTH) {
if (i > 0) {
hashWithDirs.append("/");
}
hashWithDirs.append(hash, i, Math.min(i + ENTROPY_DIR_LENGTH, hash.length()));
}

if (hash.length() > ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH) {
hashWithDirs
.append("/")
.append(hash, ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH, hash.length());
}

return hashWithDirs.toString();
}
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ private TableProperties() {}
public static final String OBJECT_STORE_ENABLED = "write.object-storage.enabled";
public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false;

// Excludes the partition values in the path when set to true and object store is enabled
public static final String WRITE_OBJECT_STORE_PARTITIONED_PATHS =
"write.object-storage.partitioned-paths";
public static final boolean WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = true;

/**
* @deprecated Use {@link #WRITE_DATA_LOCATION} instead.
*/
Expand Down
40 changes: 35 additions & 5 deletions core/src/test/java/org/apache/iceberg/TestLocationProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ public void testObjectStorageLocationProviderPathResolution() {

String dataPath = "s3://random/data/location";
table.updateProperties().set(TableProperties.WRITE_DATA_LOCATION, dataPath).commit();

assertThat(table.locationProvider().newDataLocation("file"))
.as("write data path should be used when set")
.contains(dataPath);
Expand Down Expand Up @@ -279,12 +278,12 @@ public void testObjectStorageWithinTableLocation() {
String fileLocation = table.locationProvider().newDataLocation("test.parquet");
String relativeLocation = fileLocation.replaceFirst(table.location(), "");
List<String> parts = Splitter.on("/").splitToList(relativeLocation);

assertThat(parts).hasSize(4);
assertThat(parts).hasSize(7);
assertThat(parts).first().asString().isEmpty();
assertThat(parts).element(1).asString().isEqualTo("data");
assertThat(parts).element(2).asString().isNotEmpty();
assertThat(parts).element(3).asString().isEqualTo("test.parquet");
// entropy dirs in the middle
assertThat(parts).elements(2, 3, 4, 5).asString().isNotEmpty();
assertThat(parts).element(6).asString().isEqualTo("test.parquet");
}

@TestTemplate
Expand All @@ -304,4 +303,35 @@ public void testEncodedFieldNameInPartitionPath() {

assertThat(partitionString).isEqualTo("data%231=val%231");
}

@TestTemplate
public void testExcludePartitionInPath() {
// Update the table to use a string field for partitioning with special characters in the name
table.updateProperties().set(TableProperties.OBJECT_STORE_ENABLED, "true").commit();
table
.updateProperties()
.set(TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS, "false")
.commit();

// Use a partition value that has a special character
StructLike partitionData = TestHelpers.CustomRow.of(0, "val");
String fileLocation =
table.locationProvider().newDataLocation(table.spec(), partitionData, "test.parquet");

// no partition values included in the path and last part of entropy is seperated with "-"
assertThat(fileLocation).endsWith("/data/0110/1010/0011/11101000-test.parquet");
}

@TestTemplate
public void testHashInjection() {
table.updateProperties().set(TableProperties.OBJECT_STORE_ENABLED, "true").commit();
assertThat(table.locationProvider().newDataLocation("a"))
.endsWith("/data/0101/0110/1001/10110010/a");
assertThat(table.locationProvider().newDataLocation("b"))
.endsWith("/data/1110/0111/1110/00000011/b");
assertThat(table.locationProvider().newDataLocation("c"))
.endsWith("/data/0010/1101/0110/01011111/c");
assertThat(table.locationProvider().newDataLocation("d"))
.endsWith("/data/1001/0001/0100/01110011/d");
}
}
26 changes: 22 additions & 4 deletions docs/docs/aws.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,10 @@ Data stored in S3 with a traditional Hive storage layout can face S3 request thr

Iceberg by default uses the Hive storage layout but can be switched to use the `ObjectStoreLocationProvider`.
With `ObjectStoreLocationProvider`, a deterministic hash is generated for each stored file, with the hash appended
directly after the `write.data.path`. This ensures files written to s3 are equally distributed across multiple [prefixes](https://aws.amazon.com/premiumsupport/knowledge-center/s3-object-key-naming-pattern/) in the S3 bucket. Resulting in minimized throttling and maximized throughput for S3-related IO operations. When using `ObjectStoreLocationProvider` having a shared and short `write.data.path` across your Iceberg tables will improve performance.
directly after the `write.data.path`. This ensures files written to S3 are equally distributed across multiple
[prefixes](https://aws.amazon.com/premiumsupport/knowledge-center/s3-object-key-naming-pattern/) in the S3 bucket;
resulting in minimized throttling and maximized throughput for S3-related IO operations. When using `ObjectStoreLocationProvider`
having a shared `write.data.path` across your Iceberg tables will improve performance.

For more information on how S3 scales API QPS, check out the 2018 re:Invent session on [Best Practices for Amazon S3 and Amazon S3 Glacier](https://youtu.be/rHeTn9pHNKo?t=3219). At [53:39](https://youtu.be/rHeTn9pHNKo?t=3219) it covers how S3 scales/partitions & at [54:50](https://youtu.be/rHeTn9pHNKo?t=3290) it discusses the 30-60 minute wait time before new partitions are created.

Expand All @@ -357,7 +360,7 @@ CREATE TABLE my_catalog.my_ns.my_table (
USING iceberg
OPTIONS (
'write.object-storage.enabled'=true,
'write.data.path'='s3://my-table-data-bucket')
'write.data.path'='s3://my-table-data-bucket/my_table')
PARTITIONED BY (category);
```

Expand All @@ -366,9 +369,16 @@ We can then insert a single row into this new table
INSERT INTO my_catalog.my_ns.my_table VALUES (1, "Pizza", "orders");
```

Which will write the data to S3 with a hash (`2d3905f8`) appended directly after the `write.object-storage.path`, ensuring reads to the table are spread evenly across [S3 bucket prefixes](https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html), and improving performance.
Which will write the data to S3 with a 20-bit base2 hash (`01010110100110110010`) appended directly after the `write.object-storage.path`,
ensuring reads to the table are spread evenly across [S3 bucket prefixes](https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html), and improving performance.
Previously provided base64 hash was updated to base2 in order to provide an improved auto-scaling behavior on S3 General Purpose Buckets.

As part of this update, we have also divided the entropy into multiple directories in order to improve the efficiency of the
orphan clean up process for Iceberg since directories are used as a mean to divide the work across workers for faster traversal. You
can see from the example below that we divide the hash to create 4-bit directories with a depth of 3 and attach the final part of the hash to
the end.
```
s3://my-table-data-bucket/2d3905f8/my_ns.db/my_table/category=orders/00000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet
s3://my-table-data-bucket/my_ns.db/my_table/0101/0110/1001/10110010/category=orders/00000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet
```

Note, the path resolution logic for `ObjectStoreLocationProvider` is `write.data.path` then `<tableLocation>/data`.
Expand All @@ -378,6 +388,14 @@ However, for the older versions up to 0.12.0, the logic is as follows:

For more details, please refer to the [LocationProvider Configuration](custom-catalog.md#custom-location-provider-implementation) section.

We have also added a new table property `write.object-storage.partitioned-paths` that if set to false(default=true), this will
omit the partition values from the file path. Iceberg does not need these values in the file path and setting this value to false
can further reduce the key size. In this case, we also append the final 8 bit of entropy directly to the file name.
Inserted key would look like the following with this config set, note that `category=orders` is removed:
```
s3://my-table-data-bucket/my_ns.db/my_table/1101/0100/1011/00111010-00000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet
```

### S3 Retries

Workloads which encounter S3 throttling should persistently retry, with exponential backoff, to make progress while S3
Expand Down
1 change: 1 addition & 0 deletions docs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.metadata.previous-versions-max | 100 | The max number of previous version metadata files to keep before deleting after commit |
| write.spark.fanout.enabled | false | Enables the fanout writer in Spark that does not require data to be clustered; uses more memory |
| write.object-storage.enabled | false | Enables the object storage location provider that adds a hash component to file paths |
| write.object-storage.partitioned-paths | true | Includes the partition values in the file path |
| write.data.path | table location + /data | Base location for data files |
| write.metadata.path | table location + /metadata | Base location for metadata files |
| write.delete.mode | copy-on-write | Mode used for delete commands: copy-on-write or merge-on-read (v2 only) |
Expand Down

0 comments on commit ea5da17

Please sign in to comment.