Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix auto bucket delete job (backport #49125) #49446

Merged
merged 2 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions fe/fe-core/src/main/java/com/starrocks/lake/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Tablet;
import com.starrocks.common.NoAliveBackendException;
import com.starrocks.common.UserException;
Expand Down Expand Up @@ -85,13 +86,15 @@ public static Map<Long, List<Long>> groupTabletID(Collection<Partition> partitio
throws NoAliveBackendException {
Map<Long, List<Long>> groupMap = new HashMap<>();
for (Partition partition : partitions) {
for (MaterializedIndex index : partition.getMaterializedIndices(indexState)) {
for (Tablet tablet : index.getTablets()) {
Long beId = chooseBackend((LakeTablet) tablet);
if (beId == null) {
throw new NoAliveBackendException("no alive backend");
for (PhysicalPartition physicalParition : partition.getSubPartitions()) {
for (MaterializedIndex index : partition.getMaterializedIndices(indexState)) {
for (Tablet tablet : index.getTablets()) {
Long beId = chooseBackend((LakeTablet) tablet);
if (beId == null) {
throw new NoAliveBackendException("no alive backend");
}
groupMap.computeIfAbsent(beId, k -> Lists.newArrayList()).add(tablet.getId());
}
groupMap.computeIfAbsent(beId, k -> Lists.newArrayList()).add(tablet.getId());
}
}
}
Expand Down
15 changes: 8 additions & 7 deletions fe/fe-core/src/main/java/com/starrocks/leader/LeaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.PartitionType;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.RandomDistributionInfo;
import com.starrocks.catalog.RangePartitionInfo;
import com.starrocks.catalog.Replica;
Expand Down Expand Up @@ -533,12 +534,12 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) {
throw new MetaNotFoundException("cannot find table[" + tableId + "] when push finished");
}

Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
PhysicalPartition physicalPartition = olapTable.getPhysicalPartition(partitionId);
if (physicalPartition == null) {
throw new MetaNotFoundException("cannot find partition[" + partitionId + "] when push finished");
}

MaterializedIndex pushIndex = partition.getIndex(pushIndexId);
MaterializedIndex pushIndex = physicalPartition.getIndex(pushIndexId);
if (pushIndex == null) {
// yiguolei: if index is dropped during load, it is not a failure.
// throw exception here and cause the job to cancel the task
Expand Down Expand Up @@ -568,7 +569,7 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) {
for (int i = 0; i < tabletMetaList.size(); i++) {
TabletMeta tabletMeta = tabletMetaList.get(i);
long tabletId = tabletIds.get(i);
Replica replica = findRelatedReplica(olapTable, partition,
Replica replica = findRelatedReplica(olapTable, physicalPartition,
backendId, tabletId, tabletMeta.getIndexId());
if (replica != null) {
olapDeleteJob.addFinishedReplica(partitionId, pushTabletId, replica);
Expand All @@ -587,7 +588,7 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) {
checkReplica(finishTabletInfos.get(i), tabletMeta);
long tabletId = tabletIds.get(i);
Replica replica =
findRelatedReplica(olapTable, partition, backendId, tabletId, tabletMeta.getIndexId());
findRelatedReplica(olapTable, physicalPartition, backendId, tabletId, tabletMeta.getIndexId());
// if the replica is under schema change, could not find the replica with aim schema hash
if (replica != null) {
((SparkLoadJob) job).addFinishedReplica(replica.getId(), pushTabletId, backendId);
Expand Down Expand Up @@ -631,7 +632,7 @@ private void checkReplica(TTabletInfo tTabletInfo, TabletMeta tabletMeta)
}
}

private Replica findRelatedReplica(OlapTable olapTable, Partition partition,
private Replica findRelatedReplica(OlapTable olapTable, PhysicalPartition physicalPartition,
long backendId, long tabletId, long indexId)
throws MetaNotFoundException {
// both normal index and rolling up index are in inverted index
Expand All @@ -640,7 +641,7 @@ private Replica findRelatedReplica(OlapTable olapTable, Partition partition,
LOG.warn("tablet[{}] may be dropped. push index[{}]", tabletId, indexId);
return null;
}
MaterializedIndex index = partition.getIndex(indexId);
MaterializedIndex index = physicalPartition.getIndex(indexId);
if (index == null) {
if (olapTable.getState() == OlapTableState.ROLLUP) {
// this happens when:
Expand Down
5 changes: 4 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/load/DeleteMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.PartitionType;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.PrimitiveType;
import com.starrocks.catalog.RangePartitionInfo;
import com.starrocks.catalog.Table;
Expand Down Expand Up @@ -272,7 +273,9 @@ private DeleteJob createJob(DeleteStmt stmt, List<Predicate> conditions, Databas
}
partitions.add(partition);
short replicationNum = olapTable.getPartitionInfo().getReplicationNum(partition.getId());
partitionReplicaNum.put(partition.getId(), replicationNum);
for (PhysicalPartition physicalPartition : partition.getSubPartitions()) {
partitionReplicaNum.put(physicalPartition.getId(), replicationNum);
}
}

// check conditions
Expand Down
93 changes: 49 additions & 44 deletions fe/fe-core/src/main/java/com/starrocks/load/OlapDeleteJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Replica;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Tablet;
Expand Down Expand Up @@ -122,57 +123,61 @@ public void run(DeleteStmt stmt, Database db, Table table, List<Partition> parti
// count total replica num
int totalReplicaNum = 0;
for (Partition partition : partitions) {
for (MaterializedIndex index : partition
.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) {
for (Tablet tablet : index.getTablets()) {
totalReplicaNum += ((LocalTablet) tablet).getImmutableReplicas().size();
for (PhysicalPartition physicalPartition : partition.getSubPartitions()) {
for (MaterializedIndex index : physicalPartition
.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) {
for (Tablet tablet : index.getTablets()) {
totalReplicaNum += ((LocalTablet) tablet).getImmutableReplicas().size();
}
}
}
}

countDownLatch = new MarkedCountDownLatch<>(totalReplicaNum);

for (Partition partition : partitions) {
for (MaterializedIndex index : partition
.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) {
long indexId = index.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);

List<TColumn> columnsDesc = new ArrayList<>();
for (Column column : olapTable.getSchemaByIndexId(indexId)) {
columnsDesc.add(column.toThrift());
}
for (PhysicalPartition physicalPartition : partition.getSubPartitions()) {
for (MaterializedIndex index : physicalPartition
.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) {
long indexId = index.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);

List<TColumn> columnsDesc = new ArrayList<>();
for (Column column : olapTable.getSchemaByIndexId(indexId)) {
columnsDesc.add(column.toThrift());
}

for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();

// set push type
TPushType type = TPushType.DELETE;

for (Replica replica : ((LocalTablet) tablet).getImmutableReplicas()) {
long replicaId = replica.getId();
long backendId = replica.getBackendId();
countDownLatch.addMark(backendId, tabletId);

// create push task for each replica
PushTask pushTask = new PushTask(null,
replica.getBackendId(), db.getId(), olapTable.getId(),
partition.getId(), indexId,
tabletId, replicaId, schemaHash,
-1, 0,
-1, type, conditions,
TPriority.NORMAL,
TTaskType.REALTIME_PUSH,
getTransactionId(),
GlobalStateMgr.getCurrentGlobalTransactionMgr().getTransactionIDGenerator()
.getNextTransactionId(), columnsDesc);
pushTask.setIsSchemaChanging(false);
pushTask.setCountDownLatch(countDownLatch);

if (AgentTaskQueue.addTask(pushTask)) {
batchTask.addTask(pushTask);
addPushTask(pushTask);
addTablet(tabletId);
for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();

// set push type
TPushType type = TPushType.DELETE;

for (Replica replica : ((LocalTablet) tablet).getImmutableReplicas()) {
long replicaId = replica.getId();
long backendId = replica.getBackendId();
countDownLatch.addMark(backendId, tabletId);

// create push task for each replica
PushTask pushTask = new PushTask(null,
replica.getBackendId(), db.getId(), olapTable.getId(),
physicalPartition.getId(), indexId,
tabletId, replicaId, schemaHash,
-1, 0,
-1, type, conditions,
TPriority.NORMAL,
TTaskType.REALTIME_PUSH,
getTransactionId(),
GlobalStateMgr.getCurrentGlobalTransactionMgr().getTransactionIDGenerator()
.getNextTransactionId(), columnsDesc);
pushTask.setIsSchemaChanging(false);
pushTask.setCountDownLatch(countDownLatch);

if (AgentTaskQueue.addTask(pushTask)) {
batchTask.addTask(pushTask);
addPushTask(pushTask);
addTablet(tabletId);
}
}
}
}
Expand Down Expand Up @@ -410,4 +415,4 @@ protected List<TabletCommitInfo> getTabletCommitInfos() {
protected List<TabletFailInfo> getTabletFailInfos() {
return Collections.emptyList();
}
}
}
54 changes: 53 additions & 1 deletion test/sql/test_automatic_bucket/R/test_automatic_partition
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ PROPERTIES (
"compression" = "LZ4"
);
-- !result

-- name: test_invalid_bucket_size
create table t0(k int) properties('bucket_size'='0');
-- result:
Expand All @@ -57,6 +58,7 @@ E: (5064, 'Getting analyzing error. Detail message: Illegal bucket size: -1.')
alter table t set('bucket_size'='2048');
-- result:
-- !result

-- name: test_automatic_bucket
create database kkk;
-- result:
Expand Down Expand Up @@ -88,6 +90,7 @@ select count(*) from information_schema.be_tablets t1, information_schema.tables
alter table t set('bucket_size'='1');
-- result:
-- !result

-- name: test_range_partition @sequential
create database ttt;
-- result:
Expand Down Expand Up @@ -146,6 +149,7 @@ select * from t;
2021-01-01 1
2021-01-05 1
-- !result

-- name: test_list_partition @sequential
create database ddd;
-- result:
Expand Down Expand Up @@ -204,6 +208,7 @@ select * from t;
2021-01-01 1
2021-01-03 1
-- !result

-- name: test_expr_partition @sequential
create database eee;
-- result:
Expand Down Expand Up @@ -257,6 +262,7 @@ select * from t;
2021-01-01 1
2021-01-01 1
-- !result

-- name: test_schema_change @sequential
create table t(k date, v int) DUPLICATE KEY(k) PARTITION BY DATE_TRUNC('DAY', `k`)
PROPERTIES (
Expand Down Expand Up @@ -360,6 +366,7 @@ select * from t;
2021-01-05 None 1
2021-01-05 None 2
-- !result

-- name: test_mv @sequential
create table t(k date, v int, v1 int) DUPLICATE KEY(k) PARTITION BY DATE_TRUNC('DAY', `k`)
PROPERTIES (
Expand Down Expand Up @@ -440,4 +447,49 @@ select k, v1 from t;
2021-01-01 3
2021-01-03 1
2021-01-03 2
-- !result
-- !result

-- name: test_delete
create table t(k int, v int)
PROPERTIES (
"replication_num" = "1",
"bucket_size" = "1"
);
-- result:
[]
-- !result
insert into t values(1,1);
-- result:
[]
-- !result
insert into t values(1,2);
-- result:
[]
-- !result
insert into t values(1,3);
-- result:
[]
-- !result
insert into t values(1,4);
-- result:
[]
-- !result
insert into t values(1,5);
-- result:
[]
-- !result
select * from t;
-- result:
1 2
1 3
1 4
1 5
1 1
-- !result
delete from t where k = 1;
-- result:
[]
-- !result
select * from t;
-- result:
-- !result
20 changes: 19 additions & 1 deletion test/sql/test_automatic_bucket/T/test_automatic_partition
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,22 @@ select * from t;

create materialized view mv as select k, v1 from t;
function: wait_alter_table_finish("rollup", 8)
select k, v1 from t;
select k, v1 from t;


-- name: test_delete
create table t(k int, v int)
PROPERTIES (
"replication_num" = "1",
"bucket_size" = "1"
);

insert into t values(1,1);
insert into t values(1,2);
insert into t values(1,3);
insert into t values(1,4);
insert into t values(1,5);
select * from t;

delete from t where k = 1;
select * from t;
3 changes: 3 additions & 0 deletions test/sql/test_load_profile/T/test_load_profile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- name: test_table_profile
create table t(k int, v int);

Loading