Skip to content

Commit

Permalink
add log
Browse files Browse the repository at this point in the history
  • Loading branch information
LetLetMe authored and qianye1001 committed Jan 13, 2025
1 parent f32fe78 commit 903e4aa
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,16 @@

import com.google.common.collect.Maps;
import io.netty.buffer.PooledByteBufAllocator;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.ThreadUtils;
Expand All @@ -43,16 +52,6 @@
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public abstract class AbstractRocksDBStorage {
protected static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);

Expand Down Expand Up @@ -381,6 +380,7 @@ public synchronized boolean start() {
public synchronized boolean shutdown() {
try {
if (!this.loaded) {
LOGGER.info("RocksDBStorage is not loaded, shutdown OK. dbPath={}, readOnly={}", this.dbPath, this.readOnly);
return true;
}

Expand Down Expand Up @@ -437,9 +437,9 @@ public synchronized boolean shutdown() {
this.options = null;

this.loaded = false;
LOGGER.info("shutdown OK. {}", this.dbPath);
LOGGER.info("RocksDBStorage shutdown OK. dbPath={}, readOnly={}", this.dbPath, this.readOnly);
} catch (Exception e) {
LOGGER.error("shutdown Failed. {}", this.dbPath, e);
LOGGER.error("RocksDBStorage shutdown Failed. dbPath={}", this.dbPath, e);
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,11 +517,9 @@ public void shutdown() {
if (this.compactionService != null) {
this.compactionService.shutdown();
}

if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) {
if (messageStoreConfig.isRocksdbCQDoubleWriteEnable() && this.rocksDBMessageStore != null) {
this.rocksDBMessageStore.consumeQueueStore.shutdown();
}

this.flushConsumeQueueService.shutdown();
this.allocateMappedFileService.shutdown();
this.storeCheckpoint.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private void loadMaxConsumeQueueOffsets() {
Function<OffsetEntry, Boolean> predicate = entry -> entry.type == OffsetEntryType.MAXIMUM;
Consumer<OffsetEntry> fn = entry -> {
topicQueueMaxCqOffset.putIfAbsent(entry.topic + "-" + entry.queueId, entry.offset);
ROCKSDB_LOG.info("Max {}:{} --> {}|{}", entry.topic, entry.queueId, entry.offset, entry.commitLogOffset);
log.info("LoadMaxConsumeQueueOffsets Max {}:{} --> {}|{}", entry.topic, entry.queueId, entry.offset, entry.commitLogOffset);
};
try {
forEach(predicate, fn);
Expand Down

0 comments on commit 903e4aa

Please sign in to comment.