Skip to content

Commit

Permalink
[Enhancement](sql-cache) Add partition update time for hms table and …
Browse files Browse the repository at this point in the history
…use it at sql-cache. (apache#24491)

Now FE does not record the update time of hms tbl's partitons, so the sql cache may be hit even the hive table's partitions have changed. This pr add a field to record the partition update time, and use it when enable sql-cache.
The cache will be missed if any partition has changed at hive side.

Use System.currentTimeMillis() but not the event time of hms event because we would better keep the same measurement with the schemaUpdateTime of external table. Add this value to ExternalObjectLog and let slave FEs replay it because it is better to keep the same value with all FEs, so the sql-cache can be hit by the querys through different FEs.
  • Loading branch information
dutyu authored Oct 11, 2023
1 parent 2f706cc commit 1e6d34d
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
protected long timestamp;
@SerializedName(value = "dbName")
protected String dbName;
@SerializedName(value = "lastUpdateTime")
protected long lastUpdateTime;
// this field will be refreshed after reloading schema
protected volatile long schemaUpdateTime;

protected long dbId;
protected boolean objectCreated;
Expand Down Expand Up @@ -296,9 +296,12 @@ public long getCreateTime() {
return 0;
}

// return schema update time as default
// override this method if there is some other kinds of update time
// use getSchemaUpdateTime if just need the schema update time
@Override
public long getUpdateTime() {
return 0;
return this.schemaUpdateTime;
}

@Override
Expand Down Expand Up @@ -353,7 +356,7 @@ public Optional<ColumnStatistic> getColumnStatistic(String colName) {
* @return
*/
public List<Column> initSchemaAndUpdateTime() {
lastUpdateTime = System.currentTimeMillis();
schemaUpdateTime = System.currentTimeMillis();
return initSchema();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public class HMSExternalTable extends ExternalTable {
// No as precise as row count in TableStats, but better than none.
private long estimatedRowCount = -1;

// record the partition update time when enable hms event listener
protected volatile long partitionUpdateTime;

public enum DLAType {
UNKNOWN, HIVE, HUDI, ICEBERG, DELTALAKE
}
Expand Down Expand Up @@ -270,11 +273,6 @@ public long getCreateTime() {
return 0;
}

@Override
public long getUpdateTime() {
return 0;
}

@Override
public long getRowCount() {
makeSureInitialized();
Expand Down Expand Up @@ -630,6 +628,17 @@ private void setStatData(Column col, ColumnStatisticsData data, ColumnStatisticB
}
}

public void setPartitionUpdateTime(long updateTime) {
this.partitionUpdateTime = updateTime;
}

@Override
// get the max value of `schemaUpdateTime` and `partitionUpdateTime`
// partitionUpdateTime will be refreshed after processing partition events with hms event listener enabled
public long getUpdateTime() {
return Math.max(this.schemaUpdateTime, this.partitionUpdateTime);
}

@Override
public void gsonPostProcess() throws IOException {
super.gsonPostProcess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected synchronized void makeSureInitialized() {
super.makeSureInitialized();
if (!objectCreated) {
originTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name);
lastUpdateTime = System.currentTimeMillis();
schemaUpdateTime = System.currentTimeMillis();
objectCreated = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public ProcResult fetchResult() throws AnalysisException {
tableInfo.add(FeConstants.null_string);
tableInfo.add(replicaCount);
}
tableInfo.add(TimeUtils.longToTimeString(table.getLastUpdateTime()));
tableInfo.add(TimeUtils.longToTimeString(table.getUpdateTime()));
tableInfos.add(tableInfo);
} finally {
table.readUnlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
Expand Down Expand Up @@ -928,14 +929,21 @@ public void addExternalPartitions(String catalogName, String dbName, String tabl
}
return;
}
if (!(table instanceof HMSExternalTable)) {
LOG.warn("only support HMSTable");
return;
}

Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(),
(ExternalTable) table, partitionNames);
HMSExternalTable hmsTable = (HMSExternalTable) table;
Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(), hmsTable, partitionNames);
long lastPartitionUpdateTime = System.currentTimeMillis();
hmsTable.setPartitionUpdateTime(lastPartitionUpdateTime);
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableId(table.getId());
log.setPartitionNames(partitionNames);
log.setLastUpdateTime(lastPartitionUpdateTime);
Env.getCurrentEnv().getEditLog().logAddExternalPartitions(log);
}

Expand All @@ -957,9 +965,16 @@ public void replayAddExternalPartitions(ExternalObjectLog log) {
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
return;
}
if (!(table instanceof HMSExternalTable)) {
LOG.warn("only support HMSTable");
return;
}

HMSExternalTable hmsTable = (HMSExternalTable) table;
try {
Env.getCurrentEnv().getExtMetaCacheMgr()
.addPartitionsCache(catalog.getId(), table, log.getPartitionNames());
.addPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames());
hmsTable.setPartitionUpdateTime(log.getLastUpdateTime());
} catch (HMSClientException e) {
LOG.warn("Network problem occurs or hms table has been deleted, fallback to invalidate table cache", e);
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(),
Expand Down Expand Up @@ -998,6 +1013,7 @@ public void dropExternalPartitions(String catalogName, String dbName, String tab
log.setDbId(db.getId());
log.setTableId(table.getId());
log.setPartitionNames(partitionNames);
log.setLastUpdateTime(System.currentTimeMillis());
replayDropExternalPartitions(log);
Env.getCurrentEnv().getEditLog().logDropExternalPartitions(log);
}
Expand All @@ -1020,8 +1036,14 @@ public void replayDropExternalPartitions(ExternalObjectLog log) {
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
return;
}
if (!(table instanceof HMSExternalTable)) {
LOG.warn("only support HMSTable");
return;
}
HMSExternalTable hmsTable = (HMSExternalTable) table;
Env.getCurrentEnv().getExtMetaCacheMgr()
.dropPartitionsCache(catalog.getId(), table, log.getPartitionNames());
.dropPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames());
hmsTable.setPartitionUpdateTime(log.getLastUpdateTime());
}

public void refreshExternalPartitions(String catalogName, String dbName, String tableName,
Expand Down Expand Up @@ -1058,6 +1080,7 @@ public void refreshExternalPartitions(String catalogName, String dbName, String
log.setDbId(db.getId());
log.setTableId(table.getId());
log.setPartitionNames(partitionNames);
log.setLastUpdateTime(System.currentTimeMillis());
replayRefreshExternalPartitions(log);
Env.getCurrentEnv().getEditLog().logInvalidateExternalPartitions(log);
}
Expand All @@ -1080,9 +1103,14 @@ public void replayRefreshExternalPartitions(ExternalObjectLog log) {
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
return;
}
if (!(table instanceof HMSExternalTable)) {
LOG.warn("only support HMSTable");
return;
}
Env.getCurrentEnv().getExtMetaCacheMgr()
.invalidatePartitionsCache(catalog.getId(), db.getFullName(), table.getName(),
log.getPartitionNames());
((HMSExternalTable) table).setPartitionUpdateTime(log.getLastUpdateTime());
}

public void registerCatalogRefreshListener(Env env) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.datasource;

import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
Expand Down Expand Up @@ -159,25 +158,16 @@ public void invalidateCatalogCache(long catalogId) {
LOG.debug("invalid catalog cache for {}", catalogId);
}

public void addPartitionsCache(long catalogId, ExternalTable table, List<String> partitionNames) {
if (!(table instanceof HMSExternalTable)) {
LOG.warn("only support HMSTable");
return;
}
public void addPartitionsCache(long catalogId, HMSExternalTable table, List<String> partitionNames) {
String dbName = ClusterNamespace.getNameFromFullName(table.getDbName());
HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
if (metaCache != null) {
metaCache.addPartitionsCache(dbName, table.getName(), partitionNames,
((HMSExternalTable) table).getPartitionColumnTypes());
metaCache.addPartitionsCache(dbName, table.getName(), partitionNames, table.getPartitionColumnTypes());
}
LOG.debug("add partition cache for {}.{} in catalog {}", dbName, table.getName(), catalogId);
}

public void dropPartitionsCache(long catalogId, ExternalTable table, List<String> partitionNames) {
if (!(table instanceof HMSExternalTable)) {
LOG.warn("only support HMSTable");
return;
}
public void dropPartitionsCache(long catalogId, HMSExternalTable table, List<String> partitionNames) {
String dbName = ClusterNamespace.getNameFromFullName(table.getDbName());
HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
if (metaCache != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ private CacheTable buildCacheTableForHiveScanNode(HiveScanNode node) {
CacheTable cacheTable = new CacheTable();
cacheTable.table = node.getTargetTable();
cacheTable.partitionNum = node.getReadPartitionNum();
cacheTable.latestTime = cacheTable.table.getLastUpdateTime();
cacheTable.latestTime = cacheTable.table.getUpdateTime();
return cacheTable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public String getSqlWithViewStmt() {
return cacheKey;
}

public long getLatestTime() {
return latestTable.latestTime;
}

public long getSumOfPartitionNum() {
return latestTable.sumOfPartitionNum;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ public void testRefreshTable() throws Exception {
CatalogIf test1 = env.getCatalogMgr().getCatalog("test1");
TestExternalTable table = (TestExternalTable) test1.getDbNullable("db1").getTable("tbl11").get();
Assertions.assertFalse(table.isObjectCreated());
long l1 = table.getLastUpdateTime();
long l1 = table.getSchemaUpdateTime();
Assertions.assertTrue(l1 == 0);
table.makeSureInitialized();
Assertions.assertTrue(table.isObjectCreated());
long l2 = table.getLastUpdateTime();
long l2 = table.getSchemaUpdateTime();
Assertions.assertTrue(l2 == l1);
RefreshTableStmt refreshTableStmt = new RefreshTableStmt(new TableName("test1", "db1", "tbl11"));
try {
Expand All @@ -82,12 +82,15 @@ public void testRefreshTable() throws Exception {
// Do nothing
}
Assertions.assertFalse(table.isObjectCreated());
long l3 = table.getLastUpdateTime();
long l3 = table.getSchemaUpdateTime();
Assertions.assertTrue(l3 == l2);
table.getFullSchema();
// only table.getFullSchema() can change table.lastUpdateTime
long l4 = table.getLastUpdateTime();
long l4 = table.getSchemaUpdateTime();
Assertions.assertTrue(l4 > l3);
// updateTime is equal to schema update time as default
long l5 = table.getUpdateTime();
Assertions.assertTrue(l5 == l4);
}

public static class RefreshTableProvider implements TestExternalCatalog.TestCatalogProvider {
Expand Down
Loading

0 comments on commit 1e6d34d

Please sign in to comment.