Skip to content

Commit

Permalink
Add flag to avoid multiple calls of onDataReceived
Browse files Browse the repository at this point in the history
Only start requesting blocks after wallet is synced.
Update connection state when doing repeated block requests.
Improve logs.
  • Loading branch information
djing-chan committed Nov 19, 2023
1 parent 536a29f commit 34857c9
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,12 @@ public GetBlocksRequestHandler(NetworkNode networkNode, DaoStateService daoState

public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connection connection) {
long ts = System.currentTimeMillis();
// We limit number of blocks to 3000 which is about 3 weeks.
List<Block> blocks = new LinkedList<>(daoStateService.getBlocksFromBlockHeight(getBlocksRequest.getFromBlockHeight(), 3000));
List<RawBlock> rawBlocks = blocks.stream().map(RawBlock::fromBlock).collect(Collectors.toList());
// We limit number of blocks to 3000 which is about 3 weeks and about 5 MB on data
List<Block> blocks = daoStateService.getBlocksFromBlockHeight(getBlocksRequest.getFromBlockHeight());
List<RawBlock> rawBlocks = new LinkedList<>(blocks).stream()
.map(RawBlock::fromBlock)
.limit(3000)
.collect(Collectors.toList());
GetBlocksResponse getBlocksResponse = new GetBlocksResponse(rawBlocks, getBlocksRequest.getNonce());
log.info("Received GetBlocksRequest from {} for blocks from height {}. " +
"Building GetBlocksResponse with {} blocks took {} ms.",
Expand Down
20 changes: 18 additions & 2 deletions core/src/main/java/bisq/core/dao/node/lite/LiteNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public LiteNode(BlockParser blockParser,
blockDownloadListener = (observable, oldValue, newValue) -> {
if ((double) newValue == 1) {
setupWalletBestBlockListener();
maybeStartRequestingBlocks();
}
};
}
Expand Down Expand Up @@ -176,8 +177,13 @@ public void onFault(String errorMessage, @Nullable Connection connection) {
}
});

if (!parseBlockchainComplete)
maybeStartRequestingBlocks();
}

private void maybeStartRequestingBlocks() {
if (walletsSetup.isDownloadComplete() && p2pNetworkReady && !parseBlockchainComplete) {
startParseBlocks();
}
}

// First we request the blocks from a full node
Expand All @@ -192,7 +198,8 @@ protected void startParseBlocks() {
return;
}

// If we request blocks we increment the ConnectionState counter.
// If we request blocks we increment the ConnectionState counter so that the connection does not get reset from
// INITIAL_DATA_EXCHANGE to PEER and therefore lower priority for getting closed
ConnectionState.incrementExpectedInitialDataResponses();

if (chainHeight == daoStateService.getGenesisBlockHeight()) {
Expand Down Expand Up @@ -229,6 +236,11 @@ private void onRequestedBlocksReceived(List<RawBlock> blockList, Runnable onPars
return;
}

if (walletsSetup.isDownloadComplete() && chainTipHeight < bsqWalletService.getBestChainHeight()) {
// We need to request more blocks and increment the ConnectionState counter so that the connection does not get reset from
// INITIAL_DATA_EXCHANGE to PEER and therefore lower priority for getting closed
ConnectionState.incrementExpectedInitialDataResponses();
}
runDelayedBatchProcessing(new ArrayList<>(blockList),
() -> {
double duration = System.currentTimeMillis() - ts;
Expand All @@ -239,8 +251,12 @@ private void onRequestedBlocksReceived(List<RawBlock> blockList, Runnable onPars
// We only request again if wallet is synced, otherwise we would get repeated calls we want to avoid.
// We deal with that case at the setupWalletBestBlockListener method above.
if (walletsSetup.isDownloadComplete() && daoStateService.getChainHeight() < bsqWalletService.getBestChainHeight()) {
log.info("We have completed batch processing of {} blocks but we have still {} missing blocks and request again.",
blockList.size(), bsqWalletService.getBestChainHeight() - daoStateService.getChainHeight());

liteNodeNetworkService.requestBlocks(daoStateService.getChainHeight() + 1);
} else {
log.info("We have completed batch processing of {} blocks and we have reached the chain tip of the wallet.", blockList.size());
onParsingComplete.run();
onParseBlockChainComplete();
}
Expand Down
7 changes: 0 additions & 7 deletions core/src/main/java/bisq/core/dao/state/DaoStateService.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,15 +349,8 @@ public long getBlockTime(int height) {
}

public List<Block> getBlocksFromBlockHeight(int fromBlockHeight) {
return getBlocksFromBlockHeight(fromBlockHeight, Integer.MAX_VALUE);
}

public List<Block> getBlocksFromBlockHeight(int fromBlockHeight, int numMaxBlocks) {
// We limit requests to numMaxBlocks blocks, to avoid performance issues and too
// large network data in case a node requests too far back in history.
return getBlocks().stream()
.filter(block -> block.getHeight() >= fromBlockHeight)
.limit(numMaxBlocks)
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public interface ResponseListener {
private Optional<NodeAddress> nodeAddressOfPreliminaryDataRequest = Optional.empty();
private Timer retryTimer;
private boolean dataUpdateRequested;
private boolean allDataReceived;
private boolean stopped;
private int numRepeatedRequests = 0;

Expand Down Expand Up @@ -361,18 +362,23 @@ public void onComplete(boolean wasTruncated) {

if (wasTruncated) {
if (numRepeatedRequests < 20) {
// If we had allDataReceived already set to true but get a response with truncated flag,
// we still repeat the request to that node for higher redundancy. Otherwise, one seed node
// providing incomplete data would stop others to fill the gaps.
log.info("DataResponse did not contain all data, so we repeat request until we got all data");
UserThread.runAfter(() -> requestData(nodeAddress, remainingNodeAddresses), 2);
} else {
} else if (!allDataReceived) {
allDataReceived = true;
log.warn("\n#################################################################\n" +
"Loading initial data did not complete after 20 repeated requests. \n" +
"#################################################################\n");
"Loading initial data from {} did not complete after 20 repeated requests. \n" +
"#################################################################\n", nodeAddress);
checkNotNull(listener).onDataReceived();
}
} else {
log.info("\n#################################################################\n" +
"Loading initial data completed\n" +
"#################################################################\n");
} else if (!allDataReceived) {
allDataReceived = true;
log.info("\n\n#################################################################\n" +
"Loading initial data from {} completed\n" +
"#################################################################\n", nodeAddress);
checkNotNull(listener).onDataReceived();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static GetDataResponse fromProto(protobuf.GetDataResponse proto,
boolean wasTruncated = proto.getWasTruncated();
log.info("\n\n<< Received a GetDataResponse with {} {}\n",
Utilities.readableFileSize(proto.getSerializedSize()),
wasTruncated ? " (was truncated)" : "");
wasTruncated ? " (still data missing)" : " (all data received)");
Set<ProtectedStorageEntry> dataSet = proto.getDataSetList().stream()
.map(entry -> (ProtectedStorageEntry) resolver.fromProto(entry)).collect(Collectors.toSet());
Set<PersistableNetworkPayload> persistableNetworkPayloadSet = proto.getPersistableNetworkPayloadItemsList().stream()
Expand Down

0 comments on commit 34857c9

Please sign in to comment.