Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #9069] Fix the IndexFile ConcurrentModificationException in tiered storage #9071

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.tieredstore.common;

import java.util.List;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.SelectMappedBufferResult;

public class GroupCommitContext {

private long endOffset;

private List<SelectMappedBufferResult> bufferList;

private List<DispatchRequest> dispatchRequests;

public long getEndOffset() {
return endOffset;
}

public void setEndOffset(long endOffset) {
this.endOffset = endOffset;
}

public List<SelectMappedBufferResult> getBufferList() {
return bufferList;
}

public void setBufferList(List<SelectMappedBufferResult> bufferList) {
this.bufferList = bufferList;
}

public List<DispatchRequest> getDispatchRequests() {
return dispatchRequests;
}

public void setDispatchRequests(List<DispatchRequest> dispatchRequests) {
this.dispatchRequests = dispatchRequests;
}

public void release() {
if (bufferList != null) {
for (SelectMappedBufferResult bufferResult : bufferList) {
bufferResult.release();
}
bufferList.clear();
bufferList = null;
}
if (dispatchRequests != null) {
dispatchRequests.clear();
dispatchRequests = null;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -42,6 +46,7 @@
import org.apache.rocketmq.tieredstore.TieredMessageStore;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.common.GroupCommitContext;
import org.apache.rocketmq.tieredstore.file.FlatFileInterface;
import org.apache.rocketmq.tieredstore.file.FlatFileStore;
import org.apache.rocketmq.tieredstore.index.IndexService;
Expand All @@ -65,6 +70,7 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message
protected final MessageStoreFilter topicFilter;
protected final Semaphore semaphore;
protected final IndexService indexService;
protected final Map<FlatFileInterface, GroupCommitContext> failedGroupCommitMap;

public MessageStoreDispatcherImpl(TieredMessageStore messageStore) {
this.messageStore = messageStore;
Expand All @@ -77,6 +83,7 @@ public MessageStoreDispatcherImpl(TieredMessageStore messageStore) {
this.flatFileStore = messageStore.getFlatFileStore();
this.storeExecutor = messageStore.getStoreExecutor();
this.indexService = messageStore.getIndexService();
this.failedGroupCommitMap = new ConcurrentHashMap<>();
}

@Override
Expand Down Expand Up @@ -153,10 +160,22 @@ public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile,

// If the previous commit fails, attempt to trigger a commit directly.
if (commitOffset < currentOffset) {
this.commitAsync(flatFile);
this.commitAsync(flatFile).whenComplete((result, throwable) -> {
if (throwable != null) {
log.error("topic: {}, queueId: {} flat file flush cache failed more than twice.", topic, queueId, throwable);
wangshaojie4039 marked this conversation as resolved.
Show resolved Hide resolved
}
});
return CompletableFuture.completedFuture(false);
}

if (failedGroupCommitMap.containsKey(flatFile)) {
GroupCommitContext failedCommit = failedGroupCommitMap.get(flatFile);
if (failedCommit.getEndOffset() <= commitOffset) {
failedGroupCommitMap.remove(flatFile);
constructIndexFile(flatFile.getTopicId(), failedCommit);
}
}

if (currentOffset < minOffsetInQueue) {
log.warn("MessageDispatcher#dispatch, current offset is too small, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
Expand Down Expand Up @@ -224,13 +243,16 @@ public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile,
}

long offset = currentOffset;
List<SelectMappedBufferResult> appendingBufferList = new ArrayList<>();
List<DispatchRequest> dispatchRequestList = new ArrayList<>();
for (; offset < targetOffset; offset++) {
cqUnit = consumeQueue.get(offset);
bufferSize += cqUnit.getSize();
if (bufferSize >= groupCommitSize) {
break;
}
message = defaultStore.selectOneMessageByOffset(cqUnit.getPos(), cqUnit.getSize());
appendingBufferList.add(message);

ByteBuffer byteBuffer = message.getByteBuffer();
AppendResult result = flatFile.appendCommitLog(message);
Expand All @@ -251,22 +273,40 @@ public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile,
result = flatFile.appendConsumeQueue(dispatchRequest);
if (!AppendResult.SUCCESS.equals(result)) {
break;
} else {
dispatchRequestList.add(dispatchRequest);
}
}

GroupCommitContext groupCommitContext = new GroupCommitContext();
groupCommitContext.setEndOffset(offset);
groupCommitContext.setBufferList(appendingBufferList);
groupCommitContext.setDispatchRequests(dispatchRequestList);

// If there are many messages waiting to be uploaded, call the upload logic immediately.
boolean repeat = timeout || maxOffsetInQueue - offset > storeConfig.getTieredStoreGroupCommitCount();

if (!flatFile.getDispatchRequestList().isEmpty()) {
if (!dispatchRequestList.isEmpty()) {
Attributes attributes = TieredStoreMetricsManager.newAttributesBuilder()
.put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
.put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, queueId)
.put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, FileSegmentType.COMMIT_LOG.name().toLowerCase())
.build();
TieredStoreMetricsManager.messagesDispatchTotal.add(offset - currentOffset, attributes);

this.commitAsync(flatFile).whenComplete((unused, throwable) -> {
if (repeat) {
this.commitAsync(flatFile).whenComplete((success, throwable) -> {
if (success) {
constructIndexFile(flatFile.getTopicId(), groupCommitContext);
}
else {
//next commit async,execute constructIndexFile.
GroupCommitContext oldCommit = failedGroupCommitMap.put(flatFile, groupCommitContext);
if (oldCommit != null) {
log.warn("MessageDispatcher#dispatch, topic={}, queueId={} old failed commit context not release", topic, queueId);
oldCommit.release();
}
}
if (success && repeat) {
storeExecutor.commonExecutor.submit(() -> dispatchWithSemaphore(flatFile));
}
}
Expand All @@ -282,22 +322,28 @@ public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile,
return CompletableFuture.completedFuture(false);
}

public CompletableFuture<Void> commitAsync(FlatFileInterface flatFile) {
return flatFile.commitAsync().thenAcceptAsync(success -> {
if (success) {
if (storeConfig.isMessageIndexEnable()) {
flatFile.getDispatchRequestList().forEach(
request -> constructIndexFile(flatFile.getTopicId(), request));
public CompletableFuture<Boolean> commitAsync(FlatFileInterface flatFile) {
return flatFile.commitAsync();
}

private void constructIndexFile(long topicId, GroupCommitContext groupCommitContext) {
MessageStoreExecutor.getInstance().bufferCommitExecutor.submit(() -> {
if (storeConfig.isMessageIndexEnable()) {
try {
groupCommitContext.getDispatchRequests().forEach(request -> constructIndexFile0(topicId, request));
}
catch (Throwable e) {
log.error("constructIndexFile error {}", topicId, e);
}
flatFile.release();
}
}, storeExecutor.bufferCommitExecutor);
groupCommitContext.release();
});
}

/**
* Building indexes with offsetId is no longer supported because offsetId has changed in tiered storage
*/
public void constructIndexFile(long topicId, DispatchRequest request) {
public void constructIndexFile0(long topicId, DispatchRequest request) {
Set<String> keySet = new HashSet<>();
if (StringUtils.isNotBlank(request.getUniqKey())) {
keySet.add(request.getUniqKey());
Expand All @@ -309,12 +355,27 @@ public void constructIndexFile(long topicId, DispatchRequest request) {
request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
}

private void releaseClosedPendingGroupCommit() {
Iterator<Map.Entry<FlatFileInterface, GroupCommitContext>> iterator = failedGroupCommitMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<FlatFileInterface, GroupCommitContext> entry = iterator.next();
if (entry.getKey().isClosed()) {
entry.getValue().release();
iterator.remove();
}
}
}


@Override
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore);

releaseClosedPendingGroupCommit();

this.waitForRunning(Duration.ofSeconds(20).toMillis());
} catch (Throwable t) {
log.error("MessageStore dispatch error", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.rocketmq.tieredstore.file;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import org.apache.rocketmq.common.BoundaryType;
Expand Down Expand Up @@ -58,8 +57,6 @@ public interface FlatFileInterface {
*/
AppendResult appendConsumeQueue(DispatchRequest request);

List<DispatchRequest> getDispatchRequestList();

void release();

long getMinStoreTimestamp();
Expand Down Expand Up @@ -143,6 +140,8 @@ public interface FlatFileInterface {
*/
CompletableFuture<Long> getQueueOffsetByTimeAsync(long timestamp, BoundaryType boundaryType);

boolean isClosed();

/**
* Shutdown process
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -51,14 +52,13 @@ public class FlatMessageFile implements FlatFileInterface {

protected final String filePath;
protected final ReentrantLock fileLock;
protected final Semaphore commitLock = new Semaphore(1);
protected final MessageStoreConfig storeConfig;
protected final MetadataStore metadataStore;
protected final FlatCommitLogFile commitLog;
protected final FlatConsumeQueueFile consumeQueue;
protected final AtomicLong lastDestroyTime;

protected final List<SelectMappedBufferResult> bufferResultList;
protected final List<DispatchRequest> dispatchRequestList;
protected final ConcurrentMap<String, CompletableFuture<?>> inFlightRequestMap;

public FlatMessageFile(FlatFileFactory fileFactory, String topic, int queueId) {
Expand All @@ -76,8 +76,6 @@ public FlatMessageFile(FlatFileFactory fileFactory, String filePath) {
this.commitLog = fileFactory.createFlatFileForCommitLog(filePath);
this.consumeQueue = fileFactory.createFlatFileForConsumeQueue(filePath);
this.lastDestroyTime = new AtomicLong();
this.bufferResultList = new ArrayList<>();
this.dispatchRequestList = new ArrayList<>();
this.inFlightRequestMap = new ConcurrentHashMap<>();
}

Expand Down Expand Up @@ -156,7 +154,6 @@ public AppendResult appendCommitLog(SelectMappedBufferResult message) {
if (closed) {
return AppendResult.FILE_CLOSED;
}
this.bufferResultList.add(message);
return this.appendCommitLog(message.getByteBuffer());
}

Expand All @@ -172,29 +169,14 @@ public AppendResult appendConsumeQueue(DispatchRequest request) {
buffer.putLong(request.getTagsCode());
buffer.flip();

this.dispatchRequestList.add(request);
return consumeQueue.append(buffer, request.getStoreTimestamp());
}

@Override
public List<DispatchRequest> getDispatchRequestList() {
return dispatchRequestList;
}


@Override
public void release() {
for (SelectMappedBufferResult bufferResult : bufferResultList) {
bufferResult.release();
}

if (queueMetadata != null) {
log.trace("FlatMessageFile release, topic={}, queueId={}, bufferSize={}, requestListSize={}",
queueMetadata.getQueue().getTopic(), queueMetadata.getQueue().getQueueId(),
bufferResultList.size(), dispatchRequestList.size());
}

bufferResultList.clear();
dispatchRequestList.clear();
}

@Override
Expand Down Expand Up @@ -246,13 +228,18 @@ public long getConsumeQueueCommitOffset() {

@Override
public CompletableFuture<Boolean> commitAsync() {
// acquire lock
if (commitLock.drainPermits() <= 0) {
return CompletableFuture.completedFuture(false);
}

return this.commitLog.commitAsync()
.thenCompose(result -> {
if (result) {
return consumeQueue.commitAsync();
}
return CompletableFuture.completedFuture(false);
});
}).whenComplete((result, throwable) -> commitLock.release());
}

@Override
Expand Down Expand Up @@ -363,6 +350,11 @@ public boolean equals(Object obj) {
return StringUtils.equals(filePath, ((FlatMessageFile) obj).filePath);
}

@Override
public boolean isClosed() {
return closed;
}

@Override
public void shutdown() {
closed = true;
Expand Down
Loading