Skip to content

Commit

Permalink
CreateFlushConsumeQueueService for loadAndStartConsumerServiceOnly
Browse files Browse the repository at this point in the history
  • Loading branch information
LetLetMe authored and qianye1001 committed Jan 2, 2025
1 parent f32fe78 commit f7b5628
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ public synchronized boolean start() {
public synchronized boolean shutdown() {
try {
if (!this.loaded) {
LOGGER.info("shutdown OK. {} is not loaded", this.dbPath);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public class DefaultMessageStore implements MessageStore {

protected final ConsumeQueueStoreInterface consumeQueueStore;

private final FlushConsumeQueueService flushConsumeQueueService;
protected final FlushConsumeQueueService flushConsumeQueueService;

protected final CleanCommitLogService cleanCommitLogService;

Expand Down Expand Up @@ -517,11 +517,9 @@ public void shutdown() {
if (this.compactionService != null) {
this.compactionService.shutdown();
}

if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) {
if (messageStoreConfig.isRocksdbCQDoubleWriteEnable() && this.rocksDBMessageStore != null) {
this.rocksDBMessageStore.consumeQueueStore.shutdown();
}

this.flushConsumeQueueService.shutdown();
this.allocateMappedFileService.shutdown();
this.storeCheckpoint.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ public CleanConsumeQueueService createCleanConsumeQueueService() {
return new RocksDBCleanConsumeQueueService();
}

@Override
public FlushConsumeQueueService createFlushConsumeQueueService() {
return new RocksDBFlushConsumeQueueService();
}

@Override
public CorrectLogicOffsetService createCorrectLogicOffsetService() {
return new RocksDBCorrectLogicOffsetService();
Expand Down Expand Up @@ -198,6 +193,7 @@ public void loadAndStartConsumerServiceOnly() {
throw new RuntimeException("load consume queue failed");
}
super.loadCheckPoint();
this.flushConsumeQueueService.start();
this.consumeQueueStore.start();
} catch (Exception e) {
ERROR_LOG.error("loadAndStartConsumerServiceOnly error", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private void loadMaxConsumeQueueOffsets() {
Function<OffsetEntry, Boolean> predicate = entry -> entry.type == OffsetEntryType.MAXIMUM;
Consumer<OffsetEntry> fn = entry -> {
topicQueueMaxCqOffset.putIfAbsent(entry.topic + "-" + entry.queueId, entry.offset);
ROCKSDB_LOG.info("Max {}:{} --> {}|{}", entry.topic, entry.queueId, entry.offset, entry.commitLogOffset);
log.info("LoadMaxConsumeQueueOffsets Max {}:{} --> {}|{}", entry.topic, entry.queueId, entry.offset, entry.commitLogOffset);
};
try {
forEach(predicate, fn);
Expand Down

0 comments on commit f7b5628

Please sign in to comment.