Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] remove partition version check in plan validation (backport #46733) #46781

Merged
merged 3 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,6 @@ public enum OlapTableState {

// Record the alter, schema change, MV update time
public AtomicLong lastSchemaUpdateTime = new AtomicLong(-1);
// Record the start and end time for data load version update phase
public AtomicLong lastVersionUpdateStartTime = new AtomicLong(-1);
public AtomicLong lastVersionUpdateEndTime = new AtomicLong(0);

public OlapTable() {
this(TableType.OLAP);
Expand Down Expand Up @@ -1578,9 +1575,6 @@ public void gsonPostProcess() throws IOException {
clusterId = GlobalStateMgr.getCurrentState().getClusterId();

lastSchemaUpdateTime = new AtomicLong(-1);
// Record the start and end time for data load version update phase
lastVersionUpdateStartTime = new AtomicLong(-1);
lastVersionUpdateEndTime = new AtomicLong(0);
}

public OlapTable selectiveCopy(Collection<String> reservedPartitions, boolean resetState, IndexExtState extState) {
Expand Down
41 changes: 41 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/sql/OptimisticVersion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.sql;

import com.starrocks.catalog.OlapTable;

/**
* Generate a monotonic version for optimistic lock.
* NOTE: currently we use the nano time, which is usually precise enough for the schema-change and version update
* operations. Previously we use the millisecond time, which is not safe enough.
* TODO: refactor related code to here
*/
public class OptimisticVersion {

/**
* Generate a version
*/
public static long generate() {
return System.nanoTime();
}

/**
* Validate the candidate version
*/
public static boolean validateTableUpdate(OlapTable olapTable, long candidateVersion) {
long schemaUpdate = olapTable.lastSchemaUpdateTime.get();
return schemaUpdate < candidateVersion;
}
}
46 changes: 19 additions & 27 deletions fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Inc.
package com.starrocks.sql;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
Expand All @@ -20,6 +20,7 @@
import com.starrocks.sql.ast.Relation;
import com.starrocks.sql.ast.StatementBase;
import com.starrocks.sql.ast.UpdateStmt;
import com.starrocks.sql.common.ErrorType;
import com.starrocks.sql.common.StarRocksPlannerException;
import com.starrocks.sql.optimizer.OptExpression;
import com.starrocks.sql.optimizer.Optimizer;
Expand All @@ -41,8 +42,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.starrocks.sql.common.ErrorType.INTERNAL_ERROR;

public class StatementPlanner {

public static ExecPlan plan(StatementBase stmt, ConnectContext session) {
Expand Down Expand Up @@ -158,9 +157,11 @@ public static ExecPlan createQueryPlanWithReTry(QueryStatement queryStmt,
session.setCurrentSqlDbIds(dbs.values().stream().map(Database::getId).collect(Collectors.toSet()));
// TODO: double check relatedMvs for OlapTable
// only collect once to save the original olapTable info
// the original olapTable in queryStmt had been replaced with the copied olapTable
Set<OlapTable> olapTables = collectOriginalOlapTables(queryStmt, dbs);
long planStartTime = 0;
for (int i = 0; i < Config.max_query_retry_time; ++i) {
long planStartTime = System.nanoTime();
planStartTime = OptimisticVersion.generate();
if (!isSchemaValid) {
colNames = reAnalyzeStmt(queryStmt, dbs, session);
}
Expand All @@ -183,39 +184,30 @@ public static ExecPlan createQueryPlanWithReTry(QueryStatement queryStmt,
}
try (PlannerProfile.ScopedTimer ignored = PlannerProfile.getScopedTimer("ExecPlanBuild")) {
// 3. Build fragment exec plan
/*
* SingleNodeExecPlan is set in TableQueryPlanAction to generate a single-node Plan,
* currently only used in Spark/Flink Connector
* Because the connector sends only simple queries, it only needs to remove the output fragment
*/
// For only olap table queries, we need to lock db here.
// Because we need to ensure multi partition visible versions are consistent.
long buildFragmentStartTime = System.nanoTime();
// SingleNodeExecPlan is set in TableQueryPlanAction to generate a single-node Plan,
// currently only used in Spark/Flink Connector
// Because the connector sends only simple queries, it only needs to remove the output fragment
ExecPlan plan = PlanFragmentBuilder.createPhysicalPlan(
optimizedPlan, session, logicalPlan.getOutputColumn(), columnRefFactory, colNames,
resultSinkType,
!session.getSessionVariable().isSingleNodeExecPlan());
isSchemaValid = olapTables.stream().noneMatch(t -> t.lastSchemaUpdateTime.get() > planStartTime);
isSchemaValid = isSchemaValid && olapTables.stream().allMatch(t ->
t.lastVersionUpdateEndTime.get() < buildFragmentStartTime &&
t.lastVersionUpdateEndTime.get() >= t.lastVersionUpdateStartTime.get());
final long finalPlanStartTime = planStartTime;
isSchemaValid = olapTables.stream().allMatch(t -> OptimisticVersion.validateTableUpdate(t,
finalPlanStartTime));
if (isSchemaValid) {
return plan;
}
}
}

// if exists table is applying visible log, we wait 10 ms to retry
if (olapTables.stream().anyMatch(t -> t.lastVersionUpdateStartTime.get() > t.lastVersionUpdateEndTime.get())) {
try (PlannerProfile.ScopedTimer timer = PlannerProfile.getScopedTimer("PlanRetrySleepTime")) {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new StarRocksPlannerException("query had been interrupted", INTERNAL_ERROR);
}
}
List<String> updatedTables = Lists.newArrayList();
for (OlapTable olapTable : olapTables) {
if (!OptimisticVersion.validateTableUpdate(olapTable, planStartTime)) {
updatedTables.add(olapTable.getName());
}
}
Preconditions.checkState(false, "The tablet write operation update metadata " +
"take a long time");
return null;
throw new StarRocksPlannerException(ErrorType.INTERNAL_ERROR,
"schema of %s had been updated frequently during the plan generation", updatedTables);
}

private static Set<OlapTable> collectOriginalOlapTables(QueryStatement queryStmt, Map<String, Database> dbs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf

long maxPartitionVersionTime = -1;

table.lastVersionUpdateStartTime.set(System.nanoTime());

for (PartitionCommitInfo partitionCommitInfo : commitInfo.getIdToPartitionCommitInfo().values()) {
long partitionId = partitionCommitInfo.getPartitionId();
Partition partition = table.getPartition(partitionId);
Expand Down Expand Up @@ -152,7 +150,6 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf
maxPartitionVersionTime = Math.max(maxPartitionVersionTime, versionTime);
}

table.lastVersionUpdateEndTime.set(System.nanoTime());
if (!GlobalStateMgr.isCheckpointThread() && dictCollectedVersions.size() == validDictCacheColumns.size()) {
for (int i = 0; i < validDictCacheColumns.size(); i++) {
String columnName = validDictCacheColumns.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,6 @@ int getCurrentStateJournalVersion() {
++expectedTabletId;
}

Assert.assertEquals(-1, newLakeTable.lastSchemaUpdateTime.longValue());
Assert.assertEquals(-1, newLakeTable.lastVersionUpdateStartTime.longValue());
Assert.assertEquals(0, newLakeTable.lastVersionUpdateEndTime.longValue());

Assert.assertNull(table.delete(true));
Assert.assertNotNull(table.delete(false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.starrocks.ha.FrontendNodeType;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.common.StarRocksPlannerException;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.utframe.StarRocksAssert;
import com.starrocks.utframe.UtFrameUtils;
Expand Down Expand Up @@ -64,14 +65,10 @@ public void testPlanStrategy() throws Exception {
String sql = "select * from t0";
OlapTable table = (OlapTable) GlobalStateMgr.getCurrentState().getMetadataMgr()
.getTable("default_catalog", DB_NAME, "t0");
table.lastVersionUpdateStartTime.set(2);
table.lastVersionUpdateEndTime.set(1);
try {
UtFrameUtils.getPlanAndFragment(connectContext, sql);
} catch (Exception e) {
Assert.assertTrue(e.getMessage(),
e.getMessage().contains("The tablet write operation update metadata take a long time"));
}
table.lastSchemaUpdateTime.set(System.nanoTime() + 10000000000L);
Assert.assertThrows("schema of [t0] had been updated frequently during the plan generation",
StarRocksPlannerException.class, () -> UtFrameUtils.getPlanAndFragment(connectContext, sql));

connectContext.getSessionVariable().setCboUseDBLock(true);
Pair<String, ExecPlan> plan = UtFrameUtils.getPlanAndFragment(connectContext, sql);
Assert.assertTrue(plan.first, plan.first.contains("SCAN"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.sql;

import com.starrocks.catalog.OlapTable;
import com.starrocks.sql.plan.PlanTestBase;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertTrue;

class OptimisticVersionTest extends PlanTestBase {

@BeforeAll
public static void beforeAll() throws Exception {
PlanTestBase.beforeClass();
}

@AfterAll
public static void afterAll() {
PlanTestBase.afterClass();
}

@Test
public void testOptimisticVersion() {
OlapTable table = new OlapTable();

// initialized
assertTrue(OptimisticVersion.validateTableUpdate(table, OptimisticVersion.generate()));

// schema change
table.lastSchemaUpdateTime.set(OptimisticVersion.generate());
assertTrue(OptimisticVersion.validateTableUpdate(table, OptimisticVersion.generate()));
}
}
Loading