Skip to content

Commit

Permalink
[improvement](sync version) fe sync version with be (apache#25236)
Browse files Browse the repository at this point in the history
  • Loading branch information
yujun777 authored Oct 16, 2023
1 parent e9157a3 commit f9a80ec
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 20 deletions.
36 changes: 30 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.thrift.TUniqueId;

import com.google.gson.annotations.SerializedName;
Expand Down Expand Up @@ -114,6 +115,14 @@ public enum ReplicaStatus {
private TUniqueId cooldownMetaId;
private long cooldownTerm = -1;

// A replica version should increase monotonically,
// but backend may missing some versions due to disk failure or bugs.
// FE should found these and mark the replica as missing versions.
// If backend's report version < fe version, record the backend's report version as `regressiveVersion`,
// and if time exceed 5min, fe should mark this replica as missing versions.
private long regressiveVersion = -1;
private long regressiveVersionTimestamp = 0;

/*
* This can happen when this replica is created by a balance clone task, and
* when task finished, the version of this replica is behind the partition's visible version.
Expand Down Expand Up @@ -435,9 +444,9 @@ private void updateReplicaInfo(long newVersion,

if (lastFailedVersion != this.lastFailedVersion) {
// Case 2:
if (lastFailedVersion > this.lastFailedVersion) {
if (lastFailedVersion > this.lastFailedVersion || lastFailedVersion < 0) {
this.lastFailedVersion = lastFailedVersion;
this.lastFailedTimestamp = System.currentTimeMillis();
this.lastFailedTimestamp = lastFailedVersion > 0 ? System.currentTimeMillis() : -1L;
}

this.lastSuccessVersion = this.version;
Expand Down Expand Up @@ -506,10 +515,6 @@ public boolean checkVersionCatchUp(long expectedVersion, boolean ignoreAlter) {
return true;
}

public void setLastFailedVersion(long lastFailedVersion) {
this.lastFailedVersion = lastFailedVersion;
}

public void setState(ReplicaState replicaState) {
this.state = replicaState;
}
Expand All @@ -534,6 +539,25 @@ public void setVersionCount(long versionCount) {
this.versionCount = versionCount;
}

public boolean checkVersionRegressive(long newVersion) {
if (newVersion >= version) {
regressiveVersion = -1;
regressiveVersionTimestamp = -1;
return false;
}

if (DebugPointUtil.isEnable("Replica.regressive_version_immediately")) {
return true;
}

if (newVersion != regressiveVersion) {
regressiveVersion = newVersion;
regressiveVersionTimestamp = System.currentTimeMillis();
}

return System.currentTimeMillis() - regressiveVersionTimestamp >= 5 * 60 * 1000L;
}

@Override
public String toString() {
StringBuilder strBuffer = new StringBuilder("[replicaId=");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,22 @@ private boolean needSync(Replica replicaInFe, TTabletInfo backendTabletInfo) {
if (backendTabletInfo.getVersion() > versionInFe) {
// backend replica's version is larger or newer than replica in FE, sync it.
return true;
} else if (versionInFe == backendTabletInfo.getVersion() && replicaInFe.isBad()) {
} else if (versionInFe == backendTabletInfo.getVersion()) {
// backend replica's version is equal to replica in FE, but replica in FE is bad,
// while backend replica is good, sync it
return true;
if (replicaInFe.isBad()) {
return true;
}

// FE' s replica last failed version > partition's committed version
// this can be occur when be report miss version, fe will set last failed version = visible version + 1
// then last failed version may greater than partition's committed version
//
// But here cannot got variable partition, we just check lastFailedVersion = version + 1,
// In ReportHandler.sync, we will check if last failed version > partition's committed version again.
if (replicaInFe.getLastFailedVersion() == versionInFe + 1) {
return true;
}
}

return false;
Expand Down Expand Up @@ -501,6 +513,12 @@ private boolean needRecover(Replica replicaInFe, int schemaHashInFe, TTabletInfo
// so we only return true if version_miss is true.
return true;
}

// backend versions regressive due to bugs
if (replicaInFe.checkVersionRegressive(backendTabletInfo.getVersion())) {
return true;
}

return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,13 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)

replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getDataSize(),
reportedTablet.getDataSize(), reportedTablet.getRowCount());
if (replica.getLastFailedVersion() > partition.getCommittedVersion()
&& reportedTablet.getVersion() >= partition.getCommittedVersion()
//&& !(reportedTablet.isSetVersionMiss() && reportedTablet.isVersionMiss()
&& !(reportedTablet.isSetUsed() && !reportedTablet.isUsed())) {
LOG.info("change replica {} of tablet {} 's last failed version to -1", replica, tabletId);
replica.updateLastFailedVersion(-1L);
}
if (reportedTablet.isSetPathHash()) {
replica.setPathHash(reportedTablet.getPathHash());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public TMasterResult finishTask(TFinishTaskRequest request) {
finishRecoverTablet(task);
break;
case ALTER:
finishAlterTask(task);
finishAlterTask(task, request);
break;
case ALTER_INVERTED_INDEX:
finishAlterInvertedIndexTask(task, request);
Expand Down Expand Up @@ -575,7 +575,7 @@ public TMasterResult report(TReportRequest request) throws TException {
return reportHandler.handleReport(request);
}

private void finishAlterTask(AgentTask task) {
private void finishAlterTask(AgentTask task, TFinishTaskRequest request) {
AlterReplicaTask alterTask = (AlterReplicaTask) task;
try {
if (alterTask.getJobType() == JobType.ROLLUP) {
Expand All @@ -584,6 +584,11 @@ private void finishAlterTask(AgentTask task) {
Env.getCurrentEnv().getSchemaChangeHandler().handleFinishAlterTask(alterTask);
}
alterTask.setFinished(true);
if (request.isSetReportVersion()) {
long reportVersion = request.getReportVersion();
Env.getCurrentSystemInfo().updateBackendReportVersion(
task.getBackendId(), reportVersion, task.getDbId(), task.getTableId());
}
} catch (MetaNotFoundException e) {
LOG.warn("failed to handle finish alter task: {}, {}", task.getSignature(), e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ private static void diffResource(List<TStorageResource> storageResourcesInBe, Li
}
}

private static void tabletReport(long backendId, Map<Long, TTablet> backendTablets, long backendReportVersion) {
// public for fe ut
public static void tabletReport(long backendId, Map<Long, TTablet> backendTablets, long backendReportVersion) {
long start = System.currentTimeMillis();
LOG.info("backend[{}] reports {} tablet(s). report version: {}",
backendId, backendTablets.size(), backendReportVersion);
Expand Down Expand Up @@ -607,6 +608,11 @@ private static void sync(Map<Long, TTablet> backendTablets, ListMultimap<Long, L
if (olapTable == null || !olapTable.writeLockIfExist()) {
continue;
}

if (backendReportVersion < Env.getCurrentSystemInfo().getBackendReportVersion(backendId)) {
break;
}

try {
long partitionId = tabletMeta.getPartitionId();
Partition partition = olapTable.getPartition(partitionId);
Expand Down Expand Up @@ -660,14 +666,25 @@ private static void sync(Map<Long, TTablet> backendTablets, ListMultimap<Long, L
continue;
}

if (metaVersion < backendVersion
|| (metaVersion == backendVersion && replica.isBad())) {

if (backendReportVersion < Env.getCurrentSystemInfo()
.getBackendReportVersion(backendId)) {
continue;
boolean needSync = false;
if (metaVersion < backendVersion) {
needSync = true;
} else if (metaVersion == backendVersion) {
if (replica.isBad()) {
needSync = true;
}
if (replica.getVersion() >= partition.getCommittedVersion()
&& replica.getLastFailedVersion() > partition.getCommittedVersion()) {
LOG.info("sync replica {} of tablet {} in backend {} in db {}. replica last failed"
+ " version change to -1 because last failed version > replica's committed"
+ " version {}",
replica, tabletId, backendId, dbId, partition.getCommittedVersion());
replica.updateLastFailedVersion(-1L);
needSync = true;
}
}

if (needSync) {
// happens when
// 1. PUSH finished in BE but failed or not yet report to FE
// 2. repair for VERSION_INCOMPLETE finished in BE, but failed or not yet report to FE
Expand Down Expand Up @@ -1048,18 +1065,25 @@ private static void handleRecoverTablet(ListMultimap<Long, Long> tabletRecoveryM
break;
}

if (tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) {
if ((tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss())
|| replica.checkVersionRegressive(tTabletInfo.getVersion())) {
// If the origin last failed version is larger than 0, not change it.
// Otherwise, we set last failed version to replica'version + 1.
// Because last failed version should always larger than replica's version.
long newLastFailedVersion = replica.getLastFailedVersion();
if (newLastFailedVersion < 0) {
newLastFailedVersion = replica.getVersion() + 1;
replica.updateLastFailedVersion(newLastFailedVersion);
LOG.warn("set missing version for replica {} of tablet {} on backend {}, "
+ "version in fe {}, version in be {}, be missing {}",
replica.getId(), tabletId, backendId, replica.getVersion(),
tTabletInfo.getVersion(), tTabletInfo.isVersionMiss());
}
replica.updateLastFailedVersion(newLastFailedVersion);
backendReplicasInfo.addMissingVersionReplica(tabletId, newLastFailedVersion);
break;
}

break;
}
}
} finally {
Expand Down
Loading

0 comments on commit f9a80ec

Please sign in to comment.