diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java index 03675c52682e4..4d9d99c8f2054 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java @@ -192,9 +192,6 @@ public enum OlapTableState { // Record the alter, schema change, MV update time public AtomicLong lastSchemaUpdateTime = new AtomicLong(-1); - // Record the start and end time for data load version update phase - public AtomicLong lastVersionUpdateStartTime = new AtomicLong(-1); - public AtomicLong lastVersionUpdateEndTime = new AtomicLong(0); public OlapTable() { this(TableType.OLAP); @@ -1578,9 +1575,6 @@ public void gsonPostProcess() throws IOException { clusterId = GlobalStateMgr.getCurrentState().getClusterId(); lastSchemaUpdateTime = new AtomicLong(-1); - // Record the start and end time for data load version update phase - lastVersionUpdateStartTime = new AtomicLong(-1); - lastVersionUpdateEndTime = new AtomicLong(0); } public OlapTable selectiveCopy(Collection reservedPartitions, boolean resetState, IndexExtState extState) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/OptimisticVersion.java b/fe/fe-core/src/main/java/com/starrocks/sql/OptimisticVersion.java new file mode 100644 index 0000000000000..616c7cc4613df --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/OptimisticVersion.java @@ -0,0 +1,41 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql; + +import com.starrocks.catalog.OlapTable; + +/** + * Generate a monotonic version for optimistic lock. + * NOTE: currently we use the nano time, which is usually precise enough for the schema-change and version update + * operations. Previously we use the millisecond time, which is not safe enough. + * TODO: refactor related code to here + */ +public class OptimisticVersion { + + /** + * Generate a version + */ + public static long generate() { + return System.nanoTime(); + } + + /** + * Validate the candidate version + */ + public static boolean validateTableUpdate(OlapTable olapTable, long candidateVersion) { + long schemaUpdate = olapTable.lastSchemaUpdateTime.get(); + return schemaUpdate < candidateVersion; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java b/fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java index 87d307f367590..f76f19e69231d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java @@ -1,7 +1,7 @@ // This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Inc. package com.starrocks.sql; -import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.starrocks.catalog.Database; import com.starrocks.catalog.OlapTable; @@ -20,6 +20,7 @@ import com.starrocks.sql.ast.Relation; import com.starrocks.sql.ast.StatementBase; import com.starrocks.sql.ast.UpdateStmt; +import com.starrocks.sql.common.ErrorType; import com.starrocks.sql.common.StarRocksPlannerException; import com.starrocks.sql.optimizer.OptExpression; import com.starrocks.sql.optimizer.Optimizer; @@ -41,8 +42,6 @@ import java.util.Set; import java.util.stream.Collectors; -import static com.starrocks.sql.common.ErrorType.INTERNAL_ERROR; - public class StatementPlanner { public static ExecPlan plan(StatementBase stmt, ConnectContext session) { @@ -158,9 +157,11 @@ public static ExecPlan createQueryPlanWithReTry(QueryStatement queryStmt, session.setCurrentSqlDbIds(dbs.values().stream().map(Database::getId).collect(Collectors.toSet())); // TODO: double check relatedMvs for OlapTable // only collect once to save the original olapTable info + // the original olapTable in queryStmt had been replaced with the copied olapTable Set olapTables = collectOriginalOlapTables(queryStmt, dbs); + long planStartTime = 0; for (int i = 0; i < Config.max_query_retry_time; ++i) { - long planStartTime = System.nanoTime(); + planStartTime = OptimisticVersion.generate(); if (!isSchemaValid) { colNames = reAnalyzeStmt(queryStmt, dbs, session); } @@ -183,39 +184,30 @@ public static ExecPlan createQueryPlanWithReTry(QueryStatement queryStmt, } try (PlannerProfile.ScopedTimer ignored = PlannerProfile.getScopedTimer("ExecPlanBuild")) { // 3. Build fragment exec plan - /* - * SingleNodeExecPlan is set in TableQueryPlanAction to generate a single-node Plan, - * currently only used in Spark/Flink Connector - * Because the connector sends only simple queries, it only needs to remove the output fragment - */ - // For only olap table queries, we need to lock db here. - // Because we need to ensure multi partition visible versions are consistent. - long buildFragmentStartTime = System.nanoTime(); + // SingleNodeExecPlan is set in TableQueryPlanAction to generate a single-node Plan, + // currently only used in Spark/Flink Connector + // Because the connector sends only simple queries, it only needs to remove the output fragment ExecPlan plan = PlanFragmentBuilder.createPhysicalPlan( optimizedPlan, session, logicalPlan.getOutputColumn(), columnRefFactory, colNames, resultSinkType, !session.getSessionVariable().isSingleNodeExecPlan()); - isSchemaValid = olapTables.stream().noneMatch(t -> t.lastSchemaUpdateTime.get() > planStartTime); - isSchemaValid = isSchemaValid && olapTables.stream().allMatch(t -> - t.lastVersionUpdateEndTime.get() < buildFragmentStartTime && - t.lastVersionUpdateEndTime.get() >= t.lastVersionUpdateStartTime.get()); + final long finalPlanStartTime = planStartTime; + isSchemaValid = olapTables.stream().allMatch(t -> OptimisticVersion.validateTableUpdate(t, + finalPlanStartTime)); if (isSchemaValid) { return plan; } + } + } - // if exists table is applying visible log, we wait 10 ms to retry - if (olapTables.stream().anyMatch(t -> t.lastVersionUpdateStartTime.get() > t.lastVersionUpdateEndTime.get())) { - try (PlannerProfile.ScopedTimer timer = PlannerProfile.getScopedTimer("PlanRetrySleepTime")) { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new StarRocksPlannerException("query had been interrupted", INTERNAL_ERROR); - } - } + List updatedTables = Lists.newArrayList(); + for (OlapTable olapTable : olapTables) { + if (!OptimisticVersion.validateTableUpdate(olapTable, planStartTime)) { + updatedTables.add(olapTable.getName()); } } - Preconditions.checkState(false, "The tablet write operation update metadata " + - "take a long time"); - return null; + throw new StarRocksPlannerException(ErrorType.INTERNAL_ERROR, + "schema of %s had been updated frequently during the plan generation", updatedTables); } private static Set collectOriginalOlapTables(QueryStatement queryStmt, Map dbs) { diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/OlapTableTxnLogApplier.java b/fe/fe-core/src/main/java/com/starrocks/transaction/OlapTableTxnLogApplier.java index 0ba844d8d467d..5f697943b2ae2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/OlapTableTxnLogApplier.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/OlapTableTxnLogApplier.java @@ -63,8 +63,6 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf long maxPartitionVersionTime = -1; - table.lastVersionUpdateStartTime.set(System.nanoTime()); - for (PartitionCommitInfo partitionCommitInfo : commitInfo.getIdToPartitionCommitInfo().values()) { long partitionId = partitionCommitInfo.getPartitionId(); Partition partition = table.getPartition(partitionId); @@ -152,7 +150,6 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf maxPartitionVersionTime = Math.max(maxPartitionVersionTime, versionTime); } - table.lastVersionUpdateEndTime.set(System.nanoTime()); if (!GlobalStateMgr.isCheckpointThread() && dictCollectedVersions.size() == validDictCacheColumns.size()) { for (int i = 0; i < validDictCacheColumns.size(); i++) { String columnName = validDictCacheColumns.get(i); diff --git a/fe/fe-core/src/test/java/com/starrocks/lake/LakeTableTest.java b/fe/fe-core/src/test/java/com/starrocks/lake/LakeTableTest.java index e576d58cc63ce..5a9d2890244f3 100644 --- a/fe/fe-core/src/test/java/com/starrocks/lake/LakeTableTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/lake/LakeTableTest.java @@ -120,10 +120,6 @@ int getCurrentStateJournalVersion() { ++expectedTabletId; } - Assert.assertEquals(-1, newLakeTable.lastSchemaUpdateTime.longValue()); - Assert.assertEquals(-1, newLakeTable.lastVersionUpdateStartTime.longValue()); - Assert.assertEquals(0, newLakeTable.lastVersionUpdateEndTime.longValue()); - Assert.assertNull(table.delete(true)); Assert.assertNotNull(table.delete(false)); } diff --git a/fe/fe-core/src/test/java/com/starrocks/planner/QueryPlanLockFreeTest.java b/fe/fe-core/src/test/java/com/starrocks/planner/QueryPlanLockFreeTest.java index fd20a45d52985..a6a89a14f0891 100644 --- a/fe/fe-core/src/test/java/com/starrocks/planner/QueryPlanLockFreeTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/planner/QueryPlanLockFreeTest.java @@ -20,6 +20,7 @@ import com.starrocks.ha.FrontendNodeType; import com.starrocks.qe.ConnectContext; import com.starrocks.server.GlobalStateMgr; +import com.starrocks.sql.common.StarRocksPlannerException; import com.starrocks.sql.plan.ExecPlan; import com.starrocks.utframe.StarRocksAssert; import com.starrocks.utframe.UtFrameUtils; @@ -64,14 +65,10 @@ public void testPlanStrategy() throws Exception { String sql = "select * from t0"; OlapTable table = (OlapTable) GlobalStateMgr.getCurrentState().getMetadataMgr() .getTable("default_catalog", DB_NAME, "t0"); - table.lastVersionUpdateStartTime.set(2); - table.lastVersionUpdateEndTime.set(1); - try { - UtFrameUtils.getPlanAndFragment(connectContext, sql); - } catch (Exception e) { - Assert.assertTrue(e.getMessage(), - e.getMessage().contains("The tablet write operation update metadata take a long time")); - } + table.lastSchemaUpdateTime.set(System.nanoTime() + 10000000000L); + Assert.assertThrows("schema of [t0] had been updated frequently during the plan generation", + StarRocksPlannerException.class, () -> UtFrameUtils.getPlanAndFragment(connectContext, sql)); + connectContext.getSessionVariable().setCboUseDBLock(true); Pair plan = UtFrameUtils.getPlanAndFragment(connectContext, sql); Assert.assertTrue(plan.first, plan.first.contains("SCAN")); diff --git a/fe/fe-core/src/test/java/com/starrocks/sql/OptimisticVersionTest.java b/fe/fe-core/src/test/java/com/starrocks/sql/OptimisticVersionTest.java new file mode 100644 index 0000000000000..65e6cec75d97d --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/sql/OptimisticVersionTest.java @@ -0,0 +1,48 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql; + +import com.starrocks.catalog.OlapTable; +import com.starrocks.sql.plan.PlanTestBase; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +class OptimisticVersionTest extends PlanTestBase { + + @BeforeAll + public static void beforeAll() throws Exception { + PlanTestBase.beforeClass(); + } + + @AfterAll + public static void afterAll() { + PlanTestBase.afterClass(); + } + + @Test + public void testOptimisticVersion() { + OlapTable table = new OlapTable(); + + // initialized + assertTrue(OptimisticVersion.validateTableUpdate(table, OptimisticVersion.generate())); + + // schema change + table.lastSchemaUpdateTime.set(OptimisticVersion.generate()); + assertTrue(OptimisticVersion.validateTableUpdate(table, OptimisticVersion.generate())); + } +} \ No newline at end of file