Skip to content

Commit

Permalink
feat(db/trans-cache): verify cached files with crc32c (#5523)
Browse files Browse the repository at this point in the history
  • Loading branch information
halibobo1205 authored Oct 8, 2023
1 parent fe7715d commit c623940
Showing 1 changed file with 70 additions and 16 deletions.
86 changes: 70 additions & 16 deletions chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import com.google.common.io.ByteSource;
import com.google.common.primitives.Longs;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
Expand All @@ -18,6 +22,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -79,6 +84,8 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {

private final Path cacheFile0;
private final Path cacheFile1;
private String crc32c0;
private String crc32c1;
private final Path cacheProperties;
private final Path cacheDir;
private AtomicBoolean isValid = new AtomicBoolean(false);
Expand Down Expand Up @@ -281,13 +288,18 @@ private boolean recovery() {
CompletableFuture<Boolean> tk1 = loadProperties.thenApplyAsync(
v -> recovery(1, this.cacheFile1));

return CompletableFuture.allOf(tk0, tk1).thenApply(v -> {
logger.info("recovery bloomFilters success.");
return true;
}).exceptionally(this::handleException).join();
try {
return CompletableFuture.allOf(tk0, tk1).thenApply(v -> {
logger.info("recovery bloomFilters success.");
return true;
}).exceptionally(this::handleException).join();
} finally {
clearCrc32c();
}
}

private boolean recovery(int index, Path file) {
checkCrc32c(index, file);
try (InputStream in = new BufferedInputStream(Files.newInputStream(file,
StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE))) {
logger.info("recovery bloomFilter[{}] from file.", index);
Expand Down Expand Up @@ -329,24 +341,38 @@ private void dump() {
() -> dump(0, this.cacheFile0));
CompletableFuture<Void> task1 = CompletableFuture.runAsync(
() -> dump(1, this.cacheFile1));
CompletableFuture.allOf(task0, task1).thenRun(() -> {
writeProperties();
logger.info("dump bloomFilters done.");

}).exceptionally(e -> {
logger.info("dump bloomFilters to file failed. {}", e.getMessage());
return null;
}).join();
try {
CompletableFuture.allOf(task0, task1).thenRun(() -> {
writeProperties();
logger.info("dump bloomFilters done.");
}).exceptionally(e -> {
logger.info("dump bloomFilters to file failed. {}", e.getMessage());
return null;
}).join();
} finally {
clearCrc32c();
}
}

private void dump(int index, Path file) {
logger.info("dump bloomFilters[{}] to file.", index);
long start = System.currentTimeMillis();
try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(file))) {
logger.info("dump bloomFilters[{}] to file.", index);
long start = System.currentTimeMillis();
bloomFilters[index].writeTo(out);
logger.info("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, cost {} ms.",
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
String crc32c = getCrc32c(file);
if (index == 0) {
this.crc32c0 = crc32c;
} else {
this.crc32c1 = crc32c;
}
logger.info("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, "
+ "crc32c: {}, cost {} ms.",
index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(),
System.currentTimeMillis() - start);
crc32c, System.currentTimeMillis() - start);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -367,6 +393,8 @@ private boolean loadProperties() {
String.format("currentBlockNum not match. filter: %d, db: %d",
currentBlockNum, currentBlockNumFromDB));
}
this.crc32c0 = properties.getProperty("crc32c0");
this.crc32c1 = properties.getProperty("crc32c1");
logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, load done.",
filterStartBlock, currentBlockNum, currentFilterIndex);
return true;
Expand All @@ -382,6 +410,8 @@ private void writeProperties() {
properties.setProperty("filterStartBlock", String.valueOf(filterStartBlock));
properties.setProperty("currentBlockNum", String.valueOf(currentBlockNum));
properties.setProperty("currentFilterIndex", String.valueOf(currentFilterIndex));
properties.setProperty("crc32c0", this.crc32c0);
properties.setProperty("crc32c1", this.crc32c1);
properties.store(w, "Generated by the application. PLEASE DO NOT EDIT! ");
logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, write done.",
filterStartBlock, currentBlockNum, currentFilterIndex);
Expand All @@ -390,6 +420,30 @@ private void writeProperties() {
}
}

private String getCrc32c(Path file) throws IOException {
ByteSource byteSource = com.google.common.io.Files.asByteSource(file.toFile());
HashCode hc = byteSource.hash(Hashing.crc32c());
return hc.toString();
}

private void checkCrc32c(int index, Path file) {
try {
String actual = getCrc32c(file);
String expect = index == 0 ? this.crc32c0 : this.crc32c1;
if (!Objects.equals(actual, expect)) {
throw new IllegalStateException("crc32c not match. index: " + index + ", expect: " + expect
+ ", actual: " + actual);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void clearCrc32c() {
this.crc32c0 = null;
this.crc32c1 = null;
}

@Override
public TxCacheDB newInstance() {
return new TxCacheDB(name, recentTransactionStore, dynamicPropertiesStore);
Expand Down

0 comments on commit c623940

Please sign in to comment.