Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #9069] Fix the IndexFile ConcurrentModificationException in tiered storage #9071

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.tieredstore.common;

import java.util.List;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.SelectMappedBufferResult;

public class GroupCommitContext {

private long endOffset;

private List<SelectMappedBufferResult> bufferList;

private List<DispatchRequest> dispatchRequests;

public long getEndOffset() {
return endOffset;
}

public void setEndOffset(long endOffset) {
this.endOffset = endOffset;
}

public List<SelectMappedBufferResult> getBufferList() {
return bufferList;
}

public void setBufferList(List<SelectMappedBufferResult> bufferList) {
this.bufferList = bufferList;
}

public List<DispatchRequest> getDispatchRequests() {
return dispatchRequests;
}

public void setDispatchRequests(List<DispatchRequest> dispatchRequests) {
this.dispatchRequests = dispatchRequests;
}

public void release() {
if (bufferList != null) {
for (SelectMappedBufferResult bufferResult : bufferList) {
bufferResult.release();
}
bufferList.clear();
bufferList = null;
}
if (dispatchRequests != null) {
dispatchRequests.clear();
dispatchRequests = null;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@
*/
package org.apache.rocketmq.tieredstore.core;

import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -42,6 +47,7 @@
import org.apache.rocketmq.tieredstore.TieredMessageStore;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.common.GroupCommitContext;
import org.apache.rocketmq.tieredstore.file.FlatFileInterface;
import org.apache.rocketmq.tieredstore.file.FlatFileStore;
import org.apache.rocketmq.tieredstore.index.IndexService;
Expand All @@ -65,6 +71,7 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message
protected final MessageStoreFilter topicFilter;
protected final Semaphore semaphore;
protected final IndexService indexService;
protected final Map<FlatFileInterface, GroupCommitContext> failedGroupCommitMap;

public MessageStoreDispatcherImpl(TieredMessageStore messageStore) {
this.messageStore = messageStore;
Expand All @@ -77,13 +84,19 @@ public MessageStoreDispatcherImpl(TieredMessageStore messageStore) {
this.flatFileStore = messageStore.getFlatFileStore();
this.storeExecutor = messageStore.getStoreExecutor();
this.indexService = messageStore.getIndexService();
this.failedGroupCommitMap = new ConcurrentHashMap<>();
}

@Override
public String getServiceName() {
return MessageStoreDispatcher.class.getSimpleName();
}

@VisibleForTesting
public Map<FlatFileInterface, GroupCommitContext> getFailedGroupCommitMap() {
return failedGroupCommitMap;
}

public void dispatchWithSemaphore(FlatFileInterface flatFile) {
try {
if (stopped) {
Expand Down Expand Up @@ -153,10 +166,22 @@ public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile,

// If the previous commit fails, attempt to trigger a commit directly.
if (commitOffset < currentOffset) {
this.commitAsync(flatFile);
this.commitAsync(flatFile).whenComplete((result, throwable) -> {
if (throwable != null) {
log.error("MessageDispatcher#flatFile commitOffset less than currentOffset, commitAsync again failed. topic: {}, queueId: {} ", topic, queueId, throwable);
}
});
return CompletableFuture.completedFuture(false);
}

if (failedGroupCommitMap.containsKey(flatFile)) {
GroupCommitContext failedCommit = failedGroupCommitMap.get(flatFile);
if (failedCommit.getEndOffset() <= commitOffset) {
failedGroupCommitMap.remove(flatFile);
constructIndexFile(flatFile.getTopicId(), failedCommit);
}
}

if (currentOffset < minOffsetInQueue) {
log.warn("MessageDispatcher#dispatch, current offset is too small, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
Expand Down Expand Up @@ -224,13 +249,16 @@ public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile,
}

long offset = currentOffset;
List<SelectMappedBufferResult> appendingBufferList = new ArrayList<>();
List<DispatchRequest> dispatchRequestList = new ArrayList<>();
for (; offset < targetOffset; offset++) {
cqUnit = consumeQueue.get(offset);
bufferSize += cqUnit.getSize();
if (bufferSize >= groupCommitSize) {
break;
}
message = defaultStore.selectOneMessageByOffset(cqUnit.getPos(), cqUnit.getSize());
appendingBufferList.add(message);

ByteBuffer byteBuffer = message.getByteBuffer();
AppendResult result = flatFile.appendCommitLog(message);
Expand All @@ -251,22 +279,40 @@ public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile,
result = flatFile.appendConsumeQueue(dispatchRequest);
if (!AppendResult.SUCCESS.equals(result)) {
break;
} else {
dispatchRequestList.add(dispatchRequest);
}
}

GroupCommitContext groupCommitContext = new GroupCommitContext();
groupCommitContext.setEndOffset(offset);
groupCommitContext.setBufferList(appendingBufferList);
groupCommitContext.setDispatchRequests(dispatchRequestList);

// If there are many messages waiting to be uploaded, call the upload logic immediately.
boolean repeat = timeout || maxOffsetInQueue - offset > storeConfig.getTieredStoreGroupCommitCount();

if (!flatFile.getDispatchRequestList().isEmpty()) {
if (!dispatchRequestList.isEmpty()) {
Attributes attributes = TieredStoreMetricsManager.newAttributesBuilder()
.put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
.put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, queueId)
.put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, FileSegmentType.COMMIT_LOG.name().toLowerCase())
.build();
TieredStoreMetricsManager.messagesDispatchTotal.add(offset - currentOffset, attributes);

this.commitAsync(flatFile).whenComplete((unused, throwable) -> {
if (repeat) {
this.commitAsync(flatFile).whenComplete((success, throwable) -> {
if (success) {
constructIndexFile(flatFile.getTopicId(), groupCommitContext);
}
else {
//next commit async,execute constructIndexFile.
GroupCommitContext oldCommit = failedGroupCommitMap.put(flatFile, groupCommitContext);
if (oldCommit != null) {
log.warn("MessageDispatcher#commitAsync failed,flatFile old failed commit context not release, topic={}, queueId={} ", topic, queueId);
oldCommit.release();
}
}
if (success && repeat) {
storeExecutor.commonExecutor.submit(() -> dispatchWithSemaphore(flatFile));
}
}
Expand All @@ -282,22 +328,28 @@ public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile,
return CompletableFuture.completedFuture(false);
}

public CompletableFuture<Void> commitAsync(FlatFileInterface flatFile) {
return flatFile.commitAsync().thenAcceptAsync(success -> {
if (success) {
if (storeConfig.isMessageIndexEnable()) {
flatFile.getDispatchRequestList().forEach(
request -> constructIndexFile(flatFile.getTopicId(), request));
public CompletableFuture<Boolean> commitAsync(FlatFileInterface flatFile) {
return flatFile.commitAsync();
}

public void constructIndexFile(long topicId, GroupCommitContext groupCommitContext) {
MessageStoreExecutor.getInstance().bufferCommitExecutor.submit(() -> {
if (storeConfig.isMessageIndexEnable()) {
try {
groupCommitContext.getDispatchRequests().forEach(request -> constructIndexFile0(topicId, request));
}
catch (Throwable e) {
log.error("constructIndexFile error {}", topicId, e);
}
flatFile.release();
}
}, storeExecutor.bufferCommitExecutor);
groupCommitContext.release();
});
}

/**
* Building indexes with offsetId is no longer supported because offsetId has changed in tiered storage
*/
public void constructIndexFile(long topicId, DispatchRequest request) {
public void constructIndexFile0(long topicId, DispatchRequest request) {
Set<String> keySet = new HashSet<>();
if (StringUtils.isNotBlank(request.getUniqKey())) {
keySet.add(request.getUniqKey());
Expand All @@ -309,12 +361,27 @@ public void constructIndexFile(long topicId, DispatchRequest request) {
request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
}

public void releaseClosedPendingGroupCommit() {
Iterator<Map.Entry<FlatFileInterface, GroupCommitContext>> iterator = failedGroupCommitMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<FlatFileInterface, GroupCommitContext> entry = iterator.next();
if (entry.getKey().isClosed()) {
entry.getValue().release();
iterator.remove();
}
}
}


@Override
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore);

releaseClosedPendingGroupCommit();

this.waitForRunning(Duration.ofSeconds(20).toMillis());
} catch (Throwable t) {
log.error("MessageStore dispatch error", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.rocketmq.tieredstore.file;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import org.apache.rocketmq.common.BoundaryType;
Expand Down Expand Up @@ -58,8 +57,6 @@ public interface FlatFileInterface {
*/
AppendResult appendConsumeQueue(DispatchRequest request);

List<DispatchRequest> getDispatchRequestList();

void release();

long getMinStoreTimestamp();
Expand Down Expand Up @@ -143,6 +140,8 @@ public interface FlatFileInterface {
*/
CompletableFuture<Long> getQueueOffsetByTimeAsync(long timestamp, BoundaryType boundaryType);

boolean isClosed();

/**
* Shutdown process
*/
Expand Down
Loading
Loading