Skip to content

Commit

Permalink
[Enhancement] Support Gson subtype rollback (backport #50471) (#51166)
Browse files Browse the repository at this point in the history
Signed-off-by: gengjun-git <[email protected]>
Co-authored-by: gengjun-git <[email protected]>
  • Loading branch information
mergify[bot] and gengjun-git authored Sep 20, 2024
1 parent 92f91df commit 0c2c01e
Show file tree
Hide file tree
Showing 47 changed files with 918 additions and 1,238 deletions.
21 changes: 4 additions & 17 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import com.starrocks.persist.ModifyTablePropertyOperationLog;
import com.starrocks.persist.RenameMaterializedViewLog;
import com.starrocks.persist.SwapTableOperationLog;
import com.starrocks.persist.gson.IForwardCompatibleObject;
import com.starrocks.persist.metablock.SRMetaBlockEOFException;
import com.starrocks.persist.metablock.SRMetaBlockException;
import com.starrocks.persist.metablock.SRMetaBlockID;
Expand Down Expand Up @@ -597,13 +596,7 @@ public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockExcepti
}

public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockException, SRMetaBlockEOFException {
int schemaChangeJobSize = reader.readInt();
for (int i = 0; i != schemaChangeJobSize; ++i) {
AlterJobV2 alterJobV2 = reader.readJson(AlterJobV2.class);
if (alterJobV2 instanceof IForwardCompatibleObject) {
LOG.warn("Ignore unknown alterJobV2(id: {}) from the future version!", alterJobV2.getJobId());
continue;
}
reader.readCollection(AlterJobV2.class, alterJobV2 -> {
schemaChangeHandler.addAlterJobV2(alterJobV2);

// ATTN : we just want to add tablet into TabletInvertedIndex when only PendingJob is checkpoint
Expand All @@ -613,15 +606,9 @@ public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockExcept
alterJobV2.replay(alterJobV2);
LOG.info("replay pending alter job when load alter job {} ", alterJobV2.getJobId());
}
}
});

int materializedViewJobSize = reader.readInt();
for (int i = 0; i != materializedViewJobSize; ++i) {
AlterJobV2 alterJobV2 = reader.readJson(AlterJobV2.class);
if (alterJobV2 instanceof IForwardCompatibleObject) {
LOG.warn("Ignore unknown MV job(id: {}) from the future version!", alterJobV2.getJobId());
continue;
}
reader.readCollection(AlterJobV2.class, alterJobV2 -> {
materializedViewHandler.addAlterJobV2(alterJobV2);

// ATTN : we just want to add tablet into TabletInvertedIndex when only PendingJob is checkpoint
Expand All @@ -631,6 +618,6 @@ public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockExcept
alterJobV2.replay(alterJobV2);
LOG.info("replay pending alter job when load alter job {} ", alterJobV2.getJobId());
}
}
});
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.starrocks.mysql.MysqlPassword;
import com.starrocks.mysql.privilege.AuthPlugin;
import com.starrocks.persist.ImageWriter;
import com.starrocks.persist.metablock.MapEntryConsumer;
import com.starrocks.persist.metablock.SRMetaBlockEOFException;
import com.starrocks.persist.metablock.SRMetaBlockException;
import com.starrocks.persist.metablock.SRMetaBlockID;
Expand Down Expand Up @@ -650,24 +651,22 @@ public void saveV2(ImageWriter imageWriter) throws IOException {
}

public void loadV2(SRMetaBlockReader reader) throws IOException, SRMetaBlockException, SRMetaBlockEOFException {
AuthenticationMgr ret = null;
try {
// 1 json for myself
ret = reader.readJson(AuthenticationMgr.class);
ret.userToAuthenticationInfo = new UserAuthInfoTreeMap();
// 1 json for num user
int numUser = reader.readInt();
LOG.info("loading {} users", numUser);
for (int i = 0; i != numUser; ++i) {
// 2 json for each user(kv)
UserIdentity userIdentity = reader.readJson(UserIdentity.class);
UserAuthenticationInfo userAuthenticationInfo = reader.readJson(UserAuthenticationInfo.class);
userAuthenticationInfo.analyze();
ret.userToAuthenticationInfo.put(userIdentity, userAuthenticationInfo);
}
} catch (AuthenticationException e) {
throw new RuntimeException(e);
}
// 1 json for myself
AuthenticationMgr ret = reader.readJson(AuthenticationMgr.class);
ret.userToAuthenticationInfo = new UserAuthInfoTreeMap();

LOG.info("loading users");
reader.readMap(UserIdentity.class, UserAuthenticationInfo.class,
(MapEntryConsumer<UserIdentity, UserAuthenticationInfo>) (userIdentity, userAuthenticationInfo) -> {
try {
userAuthenticationInfo.analyze();
} catch (AuthenticationException e) {
throw new IOException(e);
}

ret.userToAuthenticationInfo.put(userIdentity, userAuthenticationInfo);
});

LOG.info("loaded {} users", ret.userToAuthenticationInfo.size());

// mark data is loaded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,17 +714,16 @@ public void saveBackupHandlerV2(ImageWriter imageWriter) throws IOException, SRM
public void loadBackupHandlerV2(SRMetaBlockReader reader) throws IOException, SRMetaBlockException, SRMetaBlockEOFException {
BackupHandler data = reader.readJson(BackupHandler.class);
this.repoMgr = data.repoMgr;
int size = reader.readInt();

long currentTimeMs = System.currentTimeMillis();
while (size-- > 0) {
AbstractJob job = reader.readJson(AbstractJob.class);
reader.readCollection(AbstractJob.class, job -> {
if (isJobExpired(job, currentTimeMs)) {
LOG.warn("skip expired job {}", job);
continue;
return;
}
dbIdToBackupOrRestoreJob.put(job.getDbId(), job);
mvRestoreContext.addIntoMvBaseTableBackupInfo(job);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.persist.ImageWriter;
import com.starrocks.persist.RecoverInfo;
import com.starrocks.persist.gson.IForwardCompatibleObject;
import com.starrocks.persist.metablock.SRMetaBlockEOFException;
import com.starrocks.persist.metablock.SRMetaBlockException;
import com.starrocks.persist.metablock.SRMetaBlockID;
Expand Down Expand Up @@ -1064,30 +1063,18 @@ public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockExcepti
}

public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockException, SRMetaBlockEOFException {
int idToDatabaseSize = reader.readInt();
for (int i = 0; i < idToDatabaseSize; ++i) {
RecycleDatabaseInfo recycleDatabaseInfo = reader.readJson(RecycleDatabaseInfo.class);
reader.readCollection(RecycleDatabaseInfo.class, recycleDatabaseInfo -> {
idToDatabase.put(recycleDatabaseInfo.db.getId(), recycleDatabaseInfo);
}
});

int idToTableInfoSize = reader.readInt();
for (int i = 0; i < idToTableInfoSize; ++i) {
RecycleTableInfo recycleTableInfo = reader.readJson(RecycleTableInfo.class);
reader.readCollection(RecycleTableInfo.class, recycleTableInfo -> {
idToTableInfo.put(recycleTableInfo.dbId, recycleTableInfo.table.getId(), recycleTableInfo);
nameToTableInfo.put(recycleTableInfo.getDbId(), recycleTableInfo.getTable().getName(), recycleTableInfo);
}
});

int idToPartitionSize = reader.readInt();
for (int i = 0; i < idToPartitionSize; ++i) {
RecyclePartitionInfo recyclePartitionInfo = reader.readJson(RecyclePartitionInfoV2.class);
if (recyclePartitionInfo instanceof IForwardCompatibleObject) {
// Ignore the future unknown subtype derived from RecyclePartitionInfoV2
LOG.warn("Ignore unknown partition type(partitionId: {}) from the future version!",
recyclePartitionInfo.getPartition().getId());
continue;
}
reader.readCollection(RecyclePartitionInfoV2.class, recyclePartitionInfo -> {
idToPartition.put(recyclePartitionInfo.partition.getId(), recyclePartitionInfo);
}
});

idToRecycleTime = (Map<Long, Long>) reader.readJson(new TypeToken<Map<Long, Long>>() {
}.getType());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,7 @@ public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockExcepti
}

public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockException, SRMetaBlockEOFException {
int numJson = reader.readInt();
for (int i = 0; i < numJson; ++i) {
Function function = reader.readJson(Function.class);
replayAddFunction(function);
}
reader.readCollection(Function.class, this::replayAddFunction);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -695,12 +695,8 @@ public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockExcepti
}

public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockException, SRMetaBlockEOFException {
int numJson = reader.readInt();
List<ResourceGroup> resourceGroups = new ArrayList<>();
for (int i = 0; i < numJson; ++i) {
ResourceGroup resourceGroup = reader.readJson(ResourceGroup.class);
resourceGroups.add(resourceGroup);
}
reader.readCollection(ResourceGroup.class, resourceGroups::add);
resourceGroups.sort(Comparator.comparing(ResourceGroup::getVersion));
resourceGroups.forEach(this::replayAddResourceGroup);
}
Expand Down
27 changes: 18 additions & 9 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,15 +434,6 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static int edit_log_write_slow_log_threshold_ms = 2000;

/**
* whether ignore unknown log id
* when fe rolls back to low version, there may be log id that low version fe can not recognise
* if set to true, fe will ignore those id
* or fe will exit
*/
@ConfField(mutable = true)
public static boolean ignore_unknown_log_id = false;

/**
* hdfs_read_buffer_size_kb for reading hdfs
*/
Expand Down Expand Up @@ -2754,6 +2745,24 @@ public class Config extends ConfigBase {
@ConfField
public static boolean metadata_enable_recovery_mode = false;

/**
* Whether ignore unknown log id
* when FE rolls back to low version, there may be log id that low version FE can not recognise
* if set to true, FE will ignore those ids
* or FE will exit
*/
@ConfField(mutable = true, aliases = {"ignore_unknown_log_id"})
public static boolean metadata_ignore_unknown_operation_type = false;

/**
* Whether ignore unknown subtype
* when FE rolls back to low version, there may be classes recorded in meta FE can not recognise
* if set to true, FE will ignore the unknown subtype
* or FE will exit
*/
@ConfField
public static boolean metadata_ignore_unknown_subtype = false;

/**
* Number of profile infos reserved by `ProfileManager` for recently executed query.
* Default value: 500
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,11 +553,7 @@ public long loadSmallFiles(DataInputStream in, long checksum) throws IOException
}

public void loadSmallFilesV2(SRMetaBlockReader reader) throws IOException, SRMetaBlockEOFException, SRMetaBlockException {
int size = reader.readInt();
while (size-- > 0) {
SmallFile smallFile = reader.readJson(SmallFile.class);
putToFiles(smallFile);
}
reader.readCollection(SmallFile.class, this::putToFiles);
}

private void putToFiles(SmallFile smallFile) {
Expand Down
9 changes: 4 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/encryption/KeyMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,12 @@ public EncryptionKey create(EncryptionKeyPB pb) {
public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockException, SRMetaBlockEOFException {
keysLock.writeLock().lock();
try {
int cnt = reader.readInt();
LOG.info("loading {} keys", cnt);
for (int i = 0; i < cnt; i++) {
EncryptionKeyPB pb = reader.readJson(EncryptionKeyPB.class);
reader.readCollection(EncryptionKeyPB.class, pb -> {
EncryptionKey key = create(pb);
idToKey.put(key.id, key);
}
});
LOG.info("loaded {} keys", idToKey.size());

if (MetricRepo.hasInit) {
MetricRepo.GAUGE_ENCRYPTION_KEY_NUM.setValue((long) idToKey.size());
}
Expand Down
Loading

0 comments on commit 0c2c01e

Please sign in to comment.