Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RIP-70-2]Delete topic queue lock #8662

Open
wants to merge 52 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
2e7d146
optimize put message logic and lock mechanisms
3424672656 Jul 5, 2024
6de5073
optimize thread task assignment
3424672656 Jul 5, 2024
9a38392
optimize code typo
3424672656 Jul 6, 2024
e989189
optimize retreat strategy locking mechanism
3424672656 Jul 8, 2024
ae62212
Perfect message receiving logic
3424672656 Jul 15, 2024
4ca50fe
Add Update configuration interface
3424672656 Jul 17, 2024
ad660d8
optimize code type
3424672656 Jul 23, 2024
2589b12
optimize retreat strategy locking mechanism
3424672656 Jul 24, 2024
f941902
optimize code type
3424672656 Jul 24, 2024
b7c5dd7
optimize code type
3424672656 Jul 24, 2024
3e630a0
optimize code type
3424672656 Jul 24, 2024
24cae6f
optimize code type
3424672656 Jul 24, 2024
2aaade9
optimize code type
3424672656 Jul 24, 2024
d059816
optimize message send logic
3424672656 Jul 24, 2024
1c16d9d
optimize message send logic
3424672656 Jul 24, 2024
b6da844
optimize message send logic
3424672656 Jul 24, 2024
eda8e8f
optimize message send logic
3424672656 Jul 24, 2024
a68c305
optimize message send logic
3424672656 Jul 24, 2024
2147de0
optimize message send logic
3424672656 Jul 24, 2024
d976997
optimize message send logic
3424672656 Jul 24, 2024
435d4dc
optimize dledger message send logic
3424672656 Jul 24, 2024
538114b
optimize dledger message send logic
3424672656 Jul 24, 2024
6b72545
optimize message send logic
3424672656 Jul 25, 2024
3c3442f
optimize message send logic
3424672656 Jul 25, 2024
63c32ba
optimize message send logic
3424672656 Jul 25, 2024
300f0e9
optimize doappend logic
3424672656 Jul 25, 2024
d20d654
optimize doappend logic
3424672656 Jul 25, 2024
77dcbee
optimize retreat strategy locking mechanism
3424672656 Jul 25, 2024
a674e0e
optimize message send logic
3424672656 Jul 25, 2024
cf5797f
optimize retreat strategy locking mechanism
3424672656 Jul 25, 2024
19cc82d
optimize retreat strategy locking mechanism
3424672656 Jul 25, 2024
bdf2219
optimize exception log
3424672656 Jul 25, 2024
afec4b0
Optimized code format
3424672656 Jul 25, 2024
ffb5993
optimize retreat strategy locking mechanism
3424672656 Sep 7, 2024
896e234
optimize retreat strategy locking mechanism
3424672656 Sep 7, 2024
9b8c607
optimize retreat strategy locking mechanism
3424672656 Sep 7, 2024
14c1600
optimize retreat strategy locking mechanism
3424672656 Sep 7, 2024
d8b6330
optimize retreat strategy locking mechanism
3424672656 Sep 7, 2024
e9afdb2
optimize retreat strategy locking mechanism
3424672656 Sep 7, 2024
cd750b7
optimize retreat strategy locking mechanism
3424672656 Sep 7, 2024
b7aba8c
optimize retreat strategy locking mechanism
3424672656 Sep 7, 2024
8169742
extract the adaptive lock
3424672656 Sep 7, 2024
3ad92b6
optimize code type
3424672656 Sep 7, 2024
87576c2
Optimized exception capture
Sep 19, 2024
9fb0d83
Merge branch 'apache:develop' into delete_queue_lock
3424672656 Oct 11, 2024
fb1e23b
Merge branch 'develop' of https://github.com/3424672656/rocketmq into…
Oct 19, 2024
22cfe77
Merge branch 'delete_queue_lock' of https://github.com/3424672656/roc…
Oct 19, 2024
1e21f57
Merge branch 'develop' into delete_queue_lock
3424672656 Nov 1, 2024
6a2f31f
Merge branch 'develop' of https://github.com/3424672656/rocketmq into…
Nov 19, 2024
c4775a9
Merge branch 'delete_queue_lock' of https://github.com/3424672656/roc…
Nov 19, 2024
d5929ee
null
Dec 20, 2024
ddda24e
Merge branch 'apache:develop' into delete_queue_lock
3424672656 Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
354 changes: 194 additions & 160 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java

Large diffs are not rendered by default.

13 changes: 4 additions & 9 deletions store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
Expand Down Expand Up @@ -771,17 +770,13 @@ private void doDispatchLmqQueue(DispatchRequest request, int maxRetries, String
}

@Override
public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg) {
String topicQueueKey = getTopic() + "-" + getQueueId();
long queueOffset = queueOffsetOperator.getQueueOffset(topicQueueKey);
msg.setQueueOffset(queueOffset);
public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, short messageNum) {
queueOffsetOperator.increaseQueueOffset(topic + "-" + queueId, messageNum);
}

@Override
public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg,
short messageNum) {
String topicQueueKey = getTopic() + "-" + getQueueId();
queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
public long getQueueOffset(QueueOffsetOperator queueOffsetOperator) {
return queueOffsetOperator.getQueueOffset(topic + "-" + queueId);
}

private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ public void setBrokerInitMaxOffset(long brokerInitMaxOffset) {
this.brokerInitMaxOffset = brokerInitMaxOffset;
}

@Override
public SystemClock getSystemClock() {
return systemClock;
}
Expand Down Expand Up @@ -2016,6 +2017,7 @@ public void setStateMachineVersion(long stateMachineVersion) {
this.stateMachineVersion = stateMachineVersion;
}

@Override
public BrokerStatsManager getBrokerStatsManager() {
return brokerStatsManager;
}
Expand All @@ -2024,6 +2026,7 @@ public BrokerConfig getBrokerConfig() {
return brokerConfig;
}

@Override
public int remainTransientStoreBufferNumbs() {
if (this.isTransientStorePoolEnable()) {
return this.transientStorePool.availableBufferNums();
Expand Down Expand Up @@ -2123,23 +2126,6 @@ public boolean isSyncMaster() {
return BrokerRole.SYNC_MASTER == this.getMessageStoreConfig().getBrokerRole();
}

@Override
public void assignOffset(MessageExtBrokerInner msg) throws RocksDBException {
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());

if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
this.consumeQueueStore.assignQueueOffset(msg);
}
}

@Override
public void increaseOffset(MessageExtBrokerInner msg, short messageNum) {
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());

if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
this.consumeQueueStore.increaseQueueOffset(msg, messageNum);
}
}

public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
return this.topicConfigTable;
Expand Down
17 changes: 0 additions & 17 deletions store/src/main/java/org/apache/rocketmq/store/MessageStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -742,23 +742,6 @@ void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, Ma
*/
boolean isSyncMaster();

/**
* Assign a message to queue offset. If there is a race condition, you need to lock/unlock this method
* yourself.
*
* @param msg message
* @throws RocksDBException
*/
void assignOffset(MessageExtBrokerInner msg) throws RocksDBException;

/**
* Increase queue offset in memory table. If there is a race condition, you need to lock/unlock this method
*
* @param msg message
* @param messageNum message num
*/
void increaseOffset(MessageExtBrokerInner msg, short messageNum);

/**
* Get master broker message store in process in broker container
*
Expand Down
54 changes: 0 additions & 54 deletions store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -570,52 +570,48 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner

encodeResult = this.messageSerializer.serialize(msg);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)));
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
new AppendMessageResult(encodeResult.status)));
}

String topicQueueKey = msg.getTopic() + "-" + msg.getQueueId();
topicQueueLock.lock(topicQueueKey);
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
defaultMessageStore.assignOffset(msg);

putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
long elapsedTimeInLock;
long queueOffset;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = getQueueOffsetByKey(msg, tranType);
encodeResult.setQueueOffsetKey(queueOffset, false);
AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBody(encodeResult.getData());
dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
if (dledgerFuture.getPos() == -1) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
}
long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;

int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);

String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
} finally {
beginTimeInDledgerLock = 0;
putMessageLock.unlock();
msg.setQueueOffset(this.defaultMessageStore.getQueueStore().getQueueOffset(msg.getTopic(), msg.getQueueId()));
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = getQueueOffsetByKey(msg, tranType);
encodeResult.setQueueOffsetKey(queueOffset, false);
AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBody(encodeResult.getData());
dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
if (dledgerFuture.getPos() == -1) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY,
new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
}
long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;

int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);

String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length,
msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);

defaultMessageStore.getQueueStore().increaseQueueOffset(msg.getTopic(), msg.getQueueId(), getMessageNum(msg));
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult);
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}",
elapsedTimeInLock, msg.getBody().length, appendResult);
}

defaultMessageStore.increaseOffset(msg, getMessageNum(msg));
} catch (Exception e) {
log.error("Put message error", e);
} catch (Exception ex) {
log.error("Put message error", ex);
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
} finally {
topicQueueLock.unlock(topicQueueKey);
beginTimeInDledgerLock = 0;
putMessageLock.unlock();
}

return dledgerFuture.thenApply(appendEntryResponse -> {
Expand Down Expand Up @@ -688,78 +684,73 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch mess

encodeResult = this.messageSerializer.serialize(messageExtBatch);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
.status)));
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
new AppendMessageResult(encodeResult.status)));
}

int batchNum = encodeResult.batchData.size();
topicQueueLock.lock(encodeResult.queueOffsetKey);
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config

try {
defaultMessageStore.assignOffset(messageExtBatch);
messageExtBatch.setQueueOffset(this.defaultMessageStore.getQueueStore().getQueueOffset(messageExtBatch.getTopic(), messageExtBatch.getQueueId()));

putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
msgIdBuilder.setLength(0);
long elapsedTimeInLock;
long queueOffset;
int msgNum = 0;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = getQueueOffsetByKey(messageExtBatch, tranType);
encodeResult.setQueueOffsetKey(queueOffset, true);
BatchAppendEntryRequest request = new BatchAppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBatchMsgs(encodeResult.batchData);
AppendFuture<AppendEntryResponse> appendFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
if (appendFuture.getPos() == -1) {
log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
}
dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) appendFuture;

long wroteOffset = 0;
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = getQueueOffsetByKey(messageExtBatch, tranType);
encodeResult.setQueueOffsetKey(queueOffset, true);
BatchAppendEntryRequest request = new BatchAppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBatchMsgs(encodeResult.batchData);
AppendFuture<AppendEntryResponse> appendFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
if (appendFuture.getPos() == -1) {
log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY,
new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
}
dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) appendFuture;

long wroteOffset = 0;

int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);

boolean isFirstOffset = true;
long firstWroteOffset = 0;
for (long pos : dledgerFuture.getPositions()) {
wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
if (isFirstOffset) {
firstWroteOffset = wroteOffset;
isFirstOffset = false;
}
String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
if (msgIdBuilder.length() > 0) {
msgIdBuilder.append(',').append(msgId);
} else {
msgIdBuilder.append(msgId);
}
msgNum++;
boolean isFirstOffset = true;
long firstWroteOffset = 0;
for (long pos : dledgerFuture.getPositions()) {
wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
if (isFirstOffset) {
firstWroteOffset = wroteOffset;
isFirstOffset = false;
}
String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
if (msgIdBuilder.length() > 0) {
msgIdBuilder.append(',').append(msgId);
} else {
msgIdBuilder.append(msgId);
}
msgNum++;
}

elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen,
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen,
msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
appendResult.setMsgNum(msgNum);
} finally {
beginTimeInDledgerLock = 0;
putMessageLock.unlock();
}
appendResult.setMsgNum(msgNum);

defaultMessageStore.getQueueStore().increaseQueueOffset(messageExtBatch.getTopic(), messageExtBatch.getQueueId(), (short) encodeResult.batchData.size());
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}",
elapsedTimeInLock, messageExtBatch.getBody().length, appendResult);
elapsedTimeInLock, messageExtBatch.getBody().length, appendResult);
}

defaultMessageStore.increaseOffset(messageExtBatch, (short) batchNum);

} catch (Exception e) {
log.error("Put message error", e);
} catch (Exception re) {
log.error("Put message error", re);
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
} finally {
topicQueueLock.unlock(encodeResult.queueOffsetKey);
} finally {
beginTimeInDledgerLock = 0;
putMessageLock.unlock();
}

return dledgerFuture.thenApply(appendEntryResponse -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,16 +588,6 @@ public boolean isSyncMaster() {
return next.isSyncMaster();
}

@Override
public void assignOffset(MessageExtBrokerInner msg) throws RocksDBException {
next.assignOffset(msg);
}

@Override
public void increaseOffset(MessageExtBrokerInner msg, short messageNum) {
next.increaseOffset(msg, messageNum);
}

@Override
public List<PutMessageHook> getPutMessageHookList() {
return next.getPutMessageHookList();
Expand Down
Loading
Loading