Skip to content

Commit

Permalink
[BugFix] fix iceberg partition prune on v2 table (backport #49569) (#…
Browse files Browse the repository at this point in the history
…49617)

Co-authored-by: stephen <[email protected]>
  • Loading branch information
mergify[bot] and stephen-shelby authored Aug 8, 2024
1 parent 0b3519d commit 3531d31
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,12 @@ public List<Integer> getSortKeyIndexes() {
return indexes;
}

// TODO(stephen): we should refactor this part to be compatible with cases of different transform result types
// in the same partition column.
// day(dt) -> identity dt
public boolean hasPartitionTransformedEvolution() {
return getNativeTable().spec().fields().stream().anyMatch(field -> field.transform().isVoid());
return (!isV2Format() && getNativeTable().spec().fields().stream().anyMatch(field -> field.transform().isVoid())) ||
(isV2Format() && getNativeTable().spec().specId() > 0);
}

public void resetSnapshot() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,13 +747,14 @@ public static String convertIcebergPartitionToPartitionName(PartitionSpec partit
return ICEBERG_DEFAULT_PARTITION;
}

public static List<String> getIcebergPartitionValues(PartitionSpec spec, StructLike partition) {
public static List<String> getIcebergPartitionValues(PartitionSpec spec,
StructLike partition,
boolean existPartitionTransformedEvolution) {
PartitionData partitionData = (PartitionData) partition;
List<String> partitionValues = new ArrayList<>();
boolean existPartitionEvolution = spec.fields().stream().anyMatch(field -> field.transform().isVoid());
for (int i = 0; i < spec.fields().size(); i++) {
PartitionField partitionField = spec.fields().get(i);
if ((!partitionField.transform().isIdentity() && existPartitionEvolution) ||
if ((!partitionField.transform().isIdentity() && existPartitionTransformedEvolution) ||
(partitionField.transform().isVoid() && partitionData.get(i) == null)) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,9 +604,11 @@ public List<PartitionKey> getPrunedPartitions(Table table, ScalarOperator predic
Set<List<String>> scannedPartitions = new HashSet<>();
PartitionSpec spec = icebergTable.getNativeTable().spec();
List<Column> partitionColumns = icebergTable.getPartitionColumnsIncludeTransformed();
boolean existPartitionTransformedEvolution = ((IcebergTable) table).hasPartitionTransformedEvolution();
for (FileScanTask fileScanTask : icebergSplitTasks) {
org.apache.iceberg.PartitionData partitionData = (org.apache.iceberg.PartitionData) fileScanTask.file().partition();
List<String> values = PartitionUtil.getIcebergPartitionValues(spec, partitionData);
List<String> values = PartitionUtil.getIcebergPartitionValues(
spec, partitionData, existPartitionTransformedEvolution);

if (values.size() != partitionColumns.size()) {
// ban partition evolution and non-identify column.
Expand Down Expand Up @@ -640,7 +642,7 @@ public List<PartitionKey> getPrunedPartitions(Table table, ScalarOperator predic
partitionField)).getType());
}

if (icebergTable.hasPartitionTransformedEvolution()) {
if (existPartitionTransformedEvolution) {
srTypes = partitionColumns.stream()
.map(Column::getType)
.collect(Collectors.toList());
Expand Down
15 changes: 15 additions & 0 deletions test/sql/test_iceberg/R/test_iceberg_transformed_partition
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- name: test_iceberg_transformed_partition
create external catalog iceberg_sql_test_${uuid0} PROPERTIES ("type"="iceberg", "iceberg.catalog.type"="hive", "iceberg.catalog.hive.metastore.uris"="${iceberg_catalog_hive_metastore_uris}","enable_iceberg_metadata_cache"="true","aws.s3.access_key" = "${oss_ak}","aws.s3.secret_key" = "${oss_sk}","aws.s3.endpoint" = "${oss_endpoint}");
-- result:
-- !result
function: assert_explain_contains("select * from iceberg_sql_test_${uuid0}.iceberg_ci_db.test_identity_to_bucket_partition;","cardinality=1")
-- result:
None
-- !result
function: assert_explain_contains("select * from iceberg_sql_test_${uuid0}.iceberg_ci_db.day_partition;","cardinality=1")
-- result:
None
-- !result
drop catalog iceberg_sql_test_${uuid0};
-- result:
-- !result
8 changes: 8 additions & 0 deletions test/sql/test_iceberg/T/test_iceberg_transformed_partition
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- name: test_iceberg_transformed_partition

create external catalog iceberg_sql_test_${uuid0} PROPERTIES ("type"="iceberg", "iceberg.catalog.type"="hive", "iceberg.catalog.hive.metastore.uris"="${iceberg_catalog_hive_metastore_uris}","enable_iceberg_metadata_cache"="true","aws.s3.access_key" = "${oss_ak}","aws.s3.secret_key" = "${oss_sk}","aws.s3.endpoint" = "${oss_endpoint}");

function: assert_explain_contains("select * from iceberg_sql_test_${uuid0}.iceberg_ci_db.test_identity_to_bucket_partition;","cardinality=1")
function: assert_explain_contains("select * from iceberg_sql_test_${uuid0}.iceberg_ci_db.day_partition;","cardinality=1")

drop catalog iceberg_sql_test_${uuid0};

0 comments on commit 3531d31

Please sign in to comment.