Skip to content

Commit

Permalink
[fix](metaCache)fix bug that names cache can not invalidate. (#46287)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?
Related PR: #41510

Problem Summary:

In this pr#41510, namesCache changed from `LoadingCache<String,
List<String>>` to `LoadingCache<String, List<Pair<String, String>>>`,
but the `remove` method of this cache did not change, causing the hms
event case to fail.
  • Loading branch information
hubgeter authored Jan 3, 2025
1 parent 073fd68 commit dec53c4
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ public ExternalDatabase<? extends ExternalTable> getDbNullable(String dbName) {
if (useMetaCache.get()) {
// must use full qualified name to generate id.
// otherwise, if 2 catalogs have the same db name, the id will be the same.
return metaCache.getMetaObj(realDbName, Util.genIdByName(getQualifiedName(realDbName))).orElse(null);
return metaCache.getMetaObj(realDbName, Util.genIdByName(name, realDbName)).orElse(null);
} else {
if (dbNameToId.containsKey(realDbName)) {
return idToDb.get(dbNameToId.get(realDbName));
Expand Down Expand Up @@ -1081,10 +1081,6 @@ public void truncateTable(TruncateTableStmt stmt) throws DdlException {
}
}

public String getQualifiedName(String dbName) {
return String.join(".", name, dbName);
}

public void setAutoAnalyzePolicy(String dbName, String tableName, String policy) {
Pair<String, String> key = Pair.of(dbName, tableName);
if (policy == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,8 @@ public T getTableNullable(String tableName) {
if (extCatalog.getUseMetaCache().get()) {
// must use full qualified name to generate id.
// otherwise, if 2 databases have the same table name, the id will be the same.
return metaCache.getMetaObj(tableName, Util.genIdByName(getQualifiedName(tableName))).orElse(null);
return metaCache.getMetaObj(tableName,
Util.genIdByName(extCatalog.getName(), name, tableName)).orElse(null);
} else {
if (!tableNameToId.containsKey(tableName)) {
return null;
Expand Down Expand Up @@ -655,7 +656,7 @@ public void unregisterTable(String tableName) {

if (extCatalog.getUseMetaCache().get()) {
if (isInitialized()) {
metaCache.invalidate(tableName, Util.genIdByName(getQualifiedName(tableName)));
metaCache.invalidate(tableName, Util.genIdByName(extCatalog.getName(), name, tableName));
lowerCaseToTableName.remove(tableName.toLowerCase());
}
} else {
Expand Down Expand Up @@ -688,7 +689,9 @@ public boolean registerTable(TableIf tableIf) {
}
if (extCatalog.getUseMetaCache().get()) {
if (isInitialized()) {
metaCache.updateCache(tableName, (T) tableIf, Util.genIdByName(getQualifiedName(tableName)));
String localName = extCatalog.fromRemoteTableName(this.remoteName, tableName);
metaCache.updateCache(tableName, localName, (T) tableIf,
Util.genIdByName(extCatalog.getName(), name, localName));
lowerCaseToTableName.put(tableName.toLowerCase(), tableName);
}
} else {
Expand All @@ -704,10 +707,6 @@ public boolean registerTable(TableIf tableIf) {
return true;
}

public String getQualifiedName(String tblName) {
return String.join(".", extCatalog.getName(), name, tblName);
}

private boolean isStoredTableNamesLowerCase() {
// Because we have added a test configuration item,
// it needs to be judged together with Env.isStoredTableNamesLowerCase()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public void unregisterDatabase(String dbName) {
}
if (useMetaCache.get()) {
if (isInitialized()) {
metaCache.invalidate(dbName, Util.genIdByName(getQualifiedName(dbName)));
metaCache.invalidate(dbName, Util.genIdByName(name, dbName));
}
} else {
Long dbId = dbNameToId.remove(dbName);
Expand All @@ -265,7 +265,8 @@ public void registerDatabase(long dbId, String dbName) {
ExternalDatabase<? extends ExternalTable> db = buildDbForInit(dbName, null, dbId, logType, false);
if (useMetaCache.get()) {
if (isInitialized()) {
metaCache.updateCache(dbName, db, Util.genIdByName(getQualifiedName(dbName)));
metaCache.updateCache(db.getRemoteName(), db.getFullName(), db,
Util.genIdByName(name, db.getFullName()));
}
} else {
dbNameToId.put(dbName, dbId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

public class MetaCache<T> {
private LoadingCache<String, List<Pair<String, String>>> namesCache;
//Pair<String, String> : <Remote name, Local name>
private Map<Long, String> idToName = Maps.newConcurrentMap();
private LoadingCache<String, Optional<T>> metaObjCache;

Expand Down Expand Up @@ -101,29 +102,29 @@ public Optional<T> getMetaObjById(long id) {
return name == null ? Optional.empty() : getMetaObj(name, id);
}

public void updateCache(String objName, T obj, long id) {
metaObjCache.put(objName, Optional.of(obj));
public void updateCache(String remoteName, String localName, T obj, long id) {
metaObjCache.put(localName, Optional.of(obj));
namesCache.asMap().compute("", (k, v) -> {
if (v == null) {
return Lists.newArrayList(Pair.of(objName, objName));
return Lists.newArrayList(Pair.of(remoteName, localName));
} else {
v.add(Pair.of(objName, objName));
v.add(Pair.of(remoteName, localName));
return v;
}
});
idToName.put(id, objName);
idToName.put(id, localName);
}

public void invalidate(String objName, long id) {
public void invalidate(String localName, long id) {
namesCache.asMap().compute("", (k, v) -> {
if (v == null) {
return Lists.newArrayList();
} else {
v.remove(objName);
v.removeIf(pair -> pair.value().equals(localName));
return v;
}
});
metaObjCache.invalidate(objName);
metaObjCache.invalidate(localName);
idToName.remove(id);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.common.Pair;
import org.apache.doris.datasource.metacache.MetaCache;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class MetaCacheTest {

private MetaCache<String> metaCache;

@Before
public void setUp() {
CacheLoader<String, List<Pair<String, String>>> namesCacheLoader = key -> Lists.newArrayList();
CacheLoader<String, Optional<String>> metaObjCacheLoader = key -> Optional.empty();
RemovalListener<String, Optional<String>> removalListener = (key, value, cause) -> {};

metaCache = new MetaCache<>(
"testCache",
Executors.newCachedThreadPool(),
OptionalLong.of(1),
OptionalLong.of(1),
100, // max size
namesCacheLoader,
metaObjCacheLoader,
removalListener
);
}

@Test
public void testListNames() {
metaCache.updateCache("remote1", "local1", "meta1", 1L);
metaCache.updateCache("remote2", "local2", "meta2", 2L);

List<String> names = metaCache.listNames();
Assert.assertEquals(2, names.size());
Assert.assertTrue(names.contains("local1"));
Assert.assertTrue(names.contains("local2"));
}

@Test
public void testGetRemoteName() {
metaCache.updateCache("remote1", "local1", "meta1", 1L);

String remoteName = metaCache.getRemoteName("local1");
Assert.assertEquals("remote1", remoteName);

Assert.assertNull(metaCache.getRemoteName("nonexistent"));
}

@Test
public void testGetMetaObj() {
metaCache.updateCache("remote1", "local1", "meta1", 1L);
metaCache.updateCache("remote2", "local2", "meta2", 2L);

Optional<String> metaObj = metaCache.getMetaObj("local1", 1L);
Assert.assertTrue(metaObj.isPresent());
Assert.assertEquals("meta1", metaObj.get());

Assert.assertFalse(metaCache.getMetaObj("xxx", 2L).isPresent());

}

@Test
public void testGetMetaObjById() {
metaCache.updateCache("remote1", "local1", "meta1", 1L);
metaCache.updateCache("remote2", "local2", "meta2", 2L);
metaCache.updateCache("remote3", "local3", "meta3", 1L);

Optional<String> metaObj = metaCache.getMetaObjById(1L);
Assert.assertTrue(metaObj.isPresent());
Assert.assertEquals("meta3", metaObj.get());

Assert.assertFalse(metaCache.getMetaObjById(99L).isPresent());
}

@Test
public void testUpdateCache() {
metaCache.updateCache("remote1", "local1", "meta1", 1L);
metaCache.updateCache("remote2", "local2", "meta2", 2L);

List<String> names = metaCache.listNames();
Assert.assertEquals(2, names.size());
Assert.assertTrue(names.contains("local1"));
Assert.assertTrue(names.contains("local2"));

Optional<String> metaObj1 = metaCache.getMetaObj("local1", 1L);
Assert.assertTrue(metaObj1.isPresent());
Assert.assertEquals("meta1", metaObj1.get());

Optional<String> metaObj2 = metaCache.getMetaObj("local2", 2L);
Assert.assertTrue(metaObj2.isPresent());
Assert.assertEquals("meta2", metaObj2.get());
}

@Test
public void testInvalidate() {
metaCache.updateCache("remote1", "local1", "meta1", 1L);
metaCache.updateCache("remote2", "local2", "meta2", 2L);

// Invalidate local1 cache
metaCache.invalidate("local1", 1L);

List<String> names = metaCache.listNames();
Assert.assertEquals(1, names.size());
Assert.assertTrue(names.contains("local2"));

Optional<String> metaObj1 = metaCache.getMetaObj("local1", 1L);
Assert.assertFalse(metaObj1.isPresent());

Optional<String> metaObj2 = metaCache.getMetaObj("local2", 2L);
Assert.assertTrue(metaObj2.isPresent());
Assert.assertEquals("meta2", metaObj2.get());
}

@Test
public void testInvalidateAll() {
metaCache.updateCache("remote1", "local1", "meta1", 1L);
metaCache.updateCache("remote2", "local2", "meta2", 2L);

metaCache.invalidateAll();

List<String> names = metaCache.listNames();
Assert.assertTrue(names.isEmpty());

Assert.assertFalse(metaCache.getMetaObj("local1", 1L).isPresent());
Assert.assertFalse(metaCache.getMetaObj("local2", 2L).isPresent());
}

@Test
public void testCacheExpiration() throws InterruptedException {
metaCache.updateCache("remote1", "local1", "meta1", 1L);
Thread.sleep(2000);
Optional<String> metaObj = metaCache.getMetaObj("local1", 1L);
Assert.assertFalse(metaObj.isPresent());
}

@Test
public void testConcurrency() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);

for (int i = 0; i < 10; i++) {
final int id = i;
executorService.submit(() -> {
metaCache.updateCache("remote" + id, "local" + id, "meta" + id, id);
});
}

executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);

for (int i = 0; i < 10; i++) {
Optional<String> metaObj = metaCache.getMetaObj("local" + i, i);
Assert.assertTrue(metaObj.isPresent());
Assert.assertEquals("meta" + i, metaObj.get());
}
}

@Test
public void testMetaObjCacheLoader() throws InterruptedException {

CacheLoader<String, List<Pair<String, String>>> namesCacheLoader = key -> Lists.newArrayList();
CountDownLatch latch = new CountDownLatch(2);
CacheLoader<String, Optional<String>> metaObjCacheLoader = key -> {
latch.countDown();
return Optional.of("meta" + key);
};

RemovalListener<String, Optional<String>> removalListener = (key, value, cause) -> {};

MetaCache<String> testCache = new MetaCache<>(
"testCache",
Executors.newCachedThreadPool(),
OptionalLong.of(1),
OptionalLong.of(1),
100,
namesCacheLoader,
metaObjCacheLoader,
removalListener
);
testCache.getMetaObj("local2", 1L);

Optional<String> metaObj = testCache.getMetaObj("local1", 1L);
Assert.assertTrue(metaObj.isPresent());
Assert.assertEquals("metalocal1", metaObj.get());
latch.await();

}
}

0 comments on commit dec53c4

Please sign in to comment.