Skip to content

Commit

Permalink
[fix](mtmv) fix mtmv deadlock issue (#43376) (#43428)
Browse files Browse the repository at this point in the history
pick: #43376
  • Loading branch information
zddr authored Nov 16, 2024
1 parent 8fcce4f commit 6fe00c7
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public MTMVRefreshSnapshot getRefreshSnapshot() {
*
* @return mvPartitionName ==> mvPartitionKeyDesc
*/
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() {
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() throws AnalysisException {
Map<String, PartitionItem> mtmvItems = getAndCopyPartitionItems();
Map<String, PartitionKeyDesc> result = Maps.newHashMap();
for (Entry<String, PartitionItem> entry : mtmvItems.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -3010,8 +3011,11 @@ public PartitionType getPartitionType() {
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems() {
readLock();
public Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisException {
if (!tryReadLock(1, TimeUnit.MINUTES)) {
throw new AnalysisException(
"get table read lock timeout, database=" + getQualifiedDbName() + ",table=" + getName());
}
try {
Map<String, PartitionItem> res = Maps.newHashMap();
for (Entry<Long, PartitionItem> entry : getPartitionInfo().getIdToItem(false).entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,16 @@ private List<Pair<List<Comparable>, TRow>> getPartitionInfosInrernal() throws An

// get info
List<Pair<List<Comparable>, TRow>> partitionInfos = new ArrayList<Pair<List<Comparable>, TRow>>();
Map<Long, List<String>> partitionsUnSyncTables = null;
String mtmvPartitionSyncErrorMsg = null;
if (olapTable instanceof MTMV) {
try {
partitionsUnSyncTables = MTMVPartitionUtil
.getPartitionsUnSyncTables((MTMV) olapTable);
} catch (AnalysisException e) {
mtmvPartitionSyncErrorMsg = e.getMessage();
}
}
olapTable.readLock();
try {
List<Long> partitionIds;
Expand All @@ -258,16 +268,6 @@ private List<Pair<List<Comparable>, TRow>> getPartitionInfosInrernal() throws An
}

Joiner joiner = Joiner.on(", ");
Map<Long, List<String>> partitionsUnSyncTables = null;
String mtmvPartitionSyncErrorMsg = null;
if (olapTable instanceof MTMV) {
try {
partitionsUnSyncTables = MTMVPartitionUtil
.getPartitionsUnSyncTables((MTMV) olapTable, partitionIds);
} catch (AnalysisException e) {
mtmvPartitionSyncErrorMsg = e.getMessage();
}
}
for (Long partitionId : partitionIds) {
Partition partition = olapTable.getPartition(partitionId);

Expand Down Expand Up @@ -363,11 +363,16 @@ private List<Pair<List<Comparable>, TRow>> getPartitionInfosInrernal() throws An
if (StringUtils.isEmpty(mtmvPartitionSyncErrorMsg)) {
List<String> partitionUnSyncTables = partitionsUnSyncTables.getOrDefault(partitionId,
Lists.newArrayList());
boolean isSync = CollectionUtils.isEmpty(partitionUnSyncTables);
boolean isSync = partitionsUnSyncTables.containsKey(partitionId) && CollectionUtils.isEmpty(
partitionUnSyncTables);
partitionInfo.add(isSync);
trow.addToColumnValue(new TCell().setBoolVal(isSync));
partitionInfo.add(partitionUnSyncTables.toString());
trow.addToColumnValue(new TCell().setStringVal(partitionUnSyncTables.toString()));
// The calculation logic of partitionsUnSyncTables is not protected in the current lock,
// so the obtained partition list may not be consistent with here
String unSyncTables = partitionsUnSyncTables.containsKey(partitionId)
? partitionUnSyncTables.toString() : "not sure, please try again";
partitionInfo.add(unSyncTables);
trow.addToColumnValue(new TCell().setStringVal(unSyncTables));
} else {
partitionInfo.add(false);
trow.addToColumnValue(new TCell().setBoolVal(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,12 @@ public static boolean isMTMVSync(MTMVRefreshContext context, Set<BaseTableInfo>
* getPartitionsUnSyncTables
*
* @param mtmv
* @param partitionIds
* @return partitionName ==> UnSyncTableNames
* @throws AnalysisException
*/
public static Map<Long, List<String>> getPartitionsUnSyncTables(MTMV mtmv, List<Long> partitionIds)
public static Map<Long, List<String>> getPartitionsUnSyncTables(MTMV mtmv)
throws AnalysisException {
List<Long> partitionIds = mtmv.getPartitionIds();
Map<Long, List<String>> res = Maps.newHashMap();
MTMVRefreshContext context = MTMVRefreshContext.buildContext(mtmv);
for (Long partitionId : partitionIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface MTMVRelatedTableIf extends TableIf {
*
* @return partitionName->PartitionItem
*/
Map<String, PartitionItem> getAndCopyPartitionItems();
Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisException;

/**
* getPartitionType LIST/RANGE/UNPARTITIONED
Expand Down

0 comments on commit 6fe00c7

Please sign in to comment.