Skip to content

Commit

Permalink
[BugFix] fix refresh mv error when hive table has null values in the …
Browse files Browse the repository at this point in the history
…partition

Signed-off-by: AlgoLin <[email protected]>
  • Loading branch information
kobebryantlin0 committed Sep 20, 2024
1 parent 37d72ac commit 7b4f87b
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,9 @@ public static Map<String, Range<PartitionKey>> getRangePartitionMapOfExternalTab
if (lowerBound.getKeys().get(0).isNullable()) {
// If partition key is NULL literal, rewrite it to min value.
lowerBound = PartitionKey.createInfinityPartitionKeyWithType(
ImmutableList.of(partitionColumn.getPrimitiveType()), false);
ImmutableList.of(isConvertToDate ? partitionExpr.getType().getPrimitiveType()
: partitionColumn.getPrimitiveType()), false);
lowerBound = convertPartitionKeyIfNeeded(lowerBound, isConvertToDate);
}
Preconditions.checkState(!mvPartitionRangeMap.containsKey(partitionKeyName));
PartitionKey upperBound = nextPartitionKey(lowerBound, partitionDateTimeInterval, partitionColPrimType,
Expand All @@ -497,6 +499,13 @@ public static Map<String, Range<PartitionKey>> getRangePartitionMapOfExternalTab
return mvPartitionRangeMap;
}

public static PartitionKey convertPartitionKeyIfNeeded(PartitionKey partitionKey, boolean isConvertToDate) {
if (isConvertToDate) {
return convertToString(partitionKey);
}
return partitionKey;
}

public static PartitionKey nextPartitionKey(PartitionKey lastPartitionKey,
DateTimeInterval dateTimeInterval,
PrimitiveType partitionColPrimType,
Expand Down Expand Up @@ -676,6 +685,27 @@ private static PartitionKey convertToDate(PartitionKey partitionKey) throws Sema
}
}

/**
* Convert a date type partition key to string type partition key.
* @param partitionKey : input date partition key to convert.
* @return : partition key with string type if input can be converted.
*/
private static PartitionKey convertToString(PartitionKey partitionKey) throws SemanticException {
PartitionKey newPartitionKey = new PartitionKey();
try {
DateLiteral dateLiteral = (DateLiteral) (partitionKey.getKeys().get(0));
StringLiteral literalExpr = new StringLiteral(dateLiteral.getStringValue());
newPartitionKey.pushColumn(literalExpr, PrimitiveType.VARCHAR);
return newPartitionKey;
} catch (SemanticException e) {
SemanticException semanticException =
new SemanticException("convert date %s to string partition key failed:",
partitionKey.getKeys().get(0).getStringValue(), e);
semanticException.addSuppressed(e);
throw semanticException;
}
}

private static void putMvPartitionKeyIntoMap(Table table, Column partitionColumn, List<PartitionKey> partitionKeys,
Map<String, PartitionKey> mvPartitionKeyMap, String partitionValue)
throws AnalysisException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ public static void mockPartitionTable() {
mockT1WithMultiPartitionColumns(); // t1_par
mockT2WithMultiPartitionColumns(); // t2_par
mockT1WithNullPartitionColumns(); // t1_par_null
mockT2WithNullPartitionColumns(); // t2_par_null
mockTablesWithSinglePartitionColumn();
mockOrders();
mockWithMultiDuplicatePartitionColumns();
Expand Down Expand Up @@ -1603,6 +1604,67 @@ public static void mockT2WithMultiPartitionColumns() {
partitionNames, (long) rowCount, columnStatisticMap, remoteFileInfos));
}

public static void mockT2WithNullPartitionColumns() {
MOCK_TABLE_MAP.putIfAbsent(MOCKED_PARTITIONED_DB_NAME, new CaseInsensitiveMap<>());
Map<String, HiveTableInfo> mockTables = MOCK_TABLE_MAP.get(MOCKED_PARTITIONED_DB_NAME);

List<FieldSchema> cols = Lists.newArrayList();
cols.add(new FieldSchema("c1", "int", null));
cols.add(new FieldSchema("c2", "string", null));
cols.add(new FieldSchema("c3", "string", null));

StorageDescriptor sd =
new StorageDescriptor(cols, "", MAPRED_PARQUET_INPUT_FORMAT_CLASS, "", false,
-1, null, Lists.newArrayList(), Lists.newArrayList(), Maps.newHashMap());
Table t1 = new Table("t2_par_null", "partitioned_db", null, 0, 0, 0, sd,
ImmutableList.of(new FieldSchema("par_date", "string", null)), Maps.newHashMap(), null, null,
"EXTERNAL_TABLE");
List<String> partitionNames = Lists.newArrayList("par_date=2020-01-01", "par_date=2020-01-02",
"par_date=2020-01-03", "par_date=__HIVE_DEFAULT_PARTITION__", "par_date=2020-01-03", "par_date=2020-01-04");
Map<String, HivePartitionStats> hivePartitionStatsMap = Maps.newHashMap();
double avgNumPerPartition = (double) (100 / 3);
double rowCount = 100;

List<PartitionKey> partitionKeyList = Lists.newArrayList();
Column partitionColumn1 = new Column("par_date", Type.STRING);
List<Column> partitionColumns = ImmutableList.of(partitionColumn1);
try {
partitionKeyList.add(
PartitionUtil.createPartitionKey(ImmutableList.of("2020-01-02"), partitionColumns));
partitionKeyList.add(
PartitionUtil.createPartitionKey(ImmutableList.of("2020-01-02"), partitionColumns));
partitionKeyList.add(
PartitionUtil.createPartitionKey(ImmutableList.of("2020-01-03"), partitionColumns));
partitionKeyList.add(
PartitionUtil.createPartitionKey(ImmutableList.of("__HIVE_DEFAULT_PARTITION__"), partitionColumns));
partitionKeyList.add(
PartitionUtil.createPartitionKey(ImmutableList.of("2020-01-03"), partitionColumns));
partitionKeyList.add(
PartitionUtil.createPartitionKey(ImmutableList.of("2020-01-04"), partitionColumns));
} catch (AnalysisException e) {
throw new RuntimeException(e);
}

List<String> partitionColumnNames = ImmutableList.of("par_date");
ColumnStatistic partitionColumnStats1 =
getPartitionColumnStatistic(partitionColumn1, partitionKeyList, partitionColumnNames,
hivePartitionStatsMap, avgNumPerPartition, rowCount);

Map<String, ColumnStatistic> columnStatisticMap;
List<String> colNames = cols.stream().map(FieldSchema::getName).collect(Collectors.toList());
columnStatisticMap =
colNames.stream().collect(Collectors.toMap(Function.identity(), col -> ColumnStatistic.unknown()));
columnStatisticMap.put("par_date", partitionColumnStats1);

List<RemoteFileInfo> remoteFileInfos = Lists.newArrayList();
partitionNames.forEach(
k -> remoteFileInfos.add(new RemoteFileInfo(RemoteFileInputFormat.ORC, ImmutableList.of(), null)));

mockTables.put(t1.getTableName(),
new HiveTableInfo(HiveMetastoreApiConverter.toHiveTable(t1, MOCKED_HIVE_CATALOG_NAME),
partitionNames, (long) rowCount, columnStatisticMap, remoteFileInfos));
}

public static void mockTablesWithSinglePartitionColumn() {
MOCK_TABLE_MAP.putIfAbsent(MOCKED_PARTITIONED_DB_NAME, new CaseInsensitiveMap<>());
Map<String, HiveTableInfo> mockTables = MOCK_TABLE_MAP.get(MOCKED_PARTITIONED_DB_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,17 @@ public static void beforeClass() throws Exception {
"\"storage_medium\" = \"HDD\"\n" +
")\n" +
"AS SELECT t1.c1, t1.c2, t1_par.par_col, t1_par.par_date FROM `hive0`.`partitioned_db`.`t1` join " +
"`hive0`.`partitioned_db`.`t1_par` using (par_col)");
"`hive0`.`partitioned_db`.`t1_par` using (par_col)")
.withMaterializedView("CREATE MATERIALIZED VIEW `test`.`hive_parttbl_mv2`\n" +
"COMMENT \"MATERIALIZED_VIEW\"\n" +
"PARTITION BY str2date(`par_date`, '%Y-%m-%d')\n" +
"DISTRIBUTED BY HASH(`c1`) BUCKETS 10\n" +
"REFRESH DEFERRED MANUAL\n" +
"PROPERTIES (\n" +
"\"replication_num\" = \"1\",\n" +
"\"storage_medium\" = \"HDD\"\n" +
")\n" +
"AS SELECT c1, c2, par_date FROM `hive0`.`partitioned_db`.`t2_par_null`;");
}

@AfterClass
Expand Down Expand Up @@ -1240,4 +1250,21 @@ public void refreshTable(String catalogName, String srDbName, Table table,
),
calls);
}

@Test
public void testStr2DateMVRefreshRewriteSingleTableRewrite() throws Exception {
Database testDb = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("test");
MaterializedView materializedView = ((MaterializedView) testDb.getTable("hive_parttbl_mv2"));

Task task = TaskBuilder.buildMvTask(materializedView, testDb.getFullName());
TaskRun taskRun = TaskRunBuilder.newBuilder(task).build();
initAndExecuteTaskRun(taskRun);
PartitionBasedMvRefreshProcessor processor = (PartitionBasedMvRefreshProcessor)
taskRun.getProcessor();

MvTaskRunContext mvContext = processor.getMvContext();
ExecPlan execPlan = mvContext.getExecPlan();
String plan = execPlan.getExplainString(TExplainLevel.NORMAL);
Assert.assertTrue(plan.contains("t2_par_null"));
}
}

0 comments on commit 7b4f87b

Please sign in to comment.