diff --git a/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java b/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java index 411d49de4fc..fac1616ce57 100644 --- a/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java +++ b/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java @@ -91,9 +91,12 @@ public GetBlocksRequestHandler(NetworkNode networkNode, DaoStateService daoState public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connection connection) { long ts = System.currentTimeMillis(); - // We limit number of blocks to 3000 which is about 3 weeks. - List blocks = new LinkedList<>(daoStateService.getBlocksFromBlockHeight(getBlocksRequest.getFromBlockHeight(), 3000)); - List rawBlocks = blocks.stream().map(RawBlock::fromBlock).collect(Collectors.toList()); + // We limit number of blocks to 3000 which is about 3 weeks and about 5 MB on data + List blocks = daoStateService.getBlocksFromBlockHeight(getBlocksRequest.getFromBlockHeight()); + List rawBlocks = new LinkedList<>(blocks).stream() + .map(RawBlock::fromBlock) + .limit(3000) + .collect(Collectors.toList()); GetBlocksResponse getBlocksResponse = new GetBlocksResponse(rawBlocks, getBlocksRequest.getNonce()); log.info("Received GetBlocksRequest from {} for blocks from height {}. " + "Building GetBlocksResponse with {} blocks took {} ms.", diff --git a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java index 68e2769e14b..952b2ae57ae 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java +++ b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java @@ -87,6 +87,7 @@ public LiteNode(BlockParser blockParser, blockDownloadListener = (observable, oldValue, newValue) -> { if ((double) newValue == 1) { setupWalletBestBlockListener(); + maybeStartRequestingBlocks(); } }; } @@ -176,8 +177,13 @@ public void onFault(String errorMessage, @Nullable Connection connection) { } }); - if (!parseBlockchainComplete) + maybeStartRequestingBlocks(); + } + + private void maybeStartRequestingBlocks() { + if (walletsSetup.isDownloadComplete() && p2pNetworkReady && !parseBlockchainComplete) { startParseBlocks(); + } } // First we request the blocks from a full node @@ -192,7 +198,8 @@ protected void startParseBlocks() { return; } - // If we request blocks we increment the ConnectionState counter. + // If we request blocks we increment the ConnectionState counter so that the connection does not get reset from + // INITIAL_DATA_EXCHANGE to PEER and therefore lower priority for getting closed ConnectionState.incrementExpectedInitialDataResponses(); if (chainHeight == daoStateService.getGenesisBlockHeight()) { @@ -229,6 +236,11 @@ private void onRequestedBlocksReceived(List blockList, Runnable onPars return; } + if (walletsSetup.isDownloadComplete() && chainTipHeight < bsqWalletService.getBestChainHeight()) { + // We need to request more blocks and increment the ConnectionState counter so that the connection does not get reset from + // INITIAL_DATA_EXCHANGE to PEER and therefore lower priority for getting closed + ConnectionState.incrementExpectedInitialDataResponses(); + } runDelayedBatchProcessing(new ArrayList<>(blockList), () -> { double duration = System.currentTimeMillis() - ts; @@ -239,8 +251,12 @@ private void onRequestedBlocksReceived(List blockList, Runnable onPars // We only request again if wallet is synced, otherwise we would get repeated calls we want to avoid. // We deal with that case at the setupWalletBestBlockListener method above. if (walletsSetup.isDownloadComplete() && daoStateService.getChainHeight() < bsqWalletService.getBestChainHeight()) { + log.info("We have completed batch processing of {} blocks but we have still {} missing blocks and request again.", + blockList.size(), bsqWalletService.getBestChainHeight() - daoStateService.getChainHeight()); + liteNodeNetworkService.requestBlocks(daoStateService.getChainHeight() + 1); } else { + log.info("We have completed batch processing of {} blocks and we have reached the chain tip of the wallet.", blockList.size()); onParsingComplete.run(); onParseBlockChainComplete(); } diff --git a/core/src/main/java/bisq/core/dao/state/DaoStateService.java b/core/src/main/java/bisq/core/dao/state/DaoStateService.java index e71ca09b6c0..ebdfd1e3f13 100644 --- a/core/src/main/java/bisq/core/dao/state/DaoStateService.java +++ b/core/src/main/java/bisq/core/dao/state/DaoStateService.java @@ -349,15 +349,8 @@ public long getBlockTime(int height) { } public List getBlocksFromBlockHeight(int fromBlockHeight) { - return getBlocksFromBlockHeight(fromBlockHeight, Integer.MAX_VALUE); - } - - public List getBlocksFromBlockHeight(int fromBlockHeight, int numMaxBlocks) { - // We limit requests to numMaxBlocks blocks, to avoid performance issues and too - // large network data in case a node requests too far back in history. return getBlocks().stream() .filter(block -> block.getHeight() >= fromBlockHeight) - .limit(numMaxBlocks) .collect(Collectors.toList()); } diff --git a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java index bb0d0266491..cc280588891 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java @@ -109,6 +109,7 @@ public interface ResponseListener { private Optional nodeAddressOfPreliminaryDataRequest = Optional.empty(); private Timer retryTimer; private boolean dataUpdateRequested; + private boolean allDataReceived; private boolean stopped; private int numRepeatedRequests = 0; @@ -361,18 +362,23 @@ public void onComplete(boolean wasTruncated) { if (wasTruncated) { if (numRepeatedRequests < 20) { + // If we had allDataReceived already set to true but get a response with truncated flag, + // we still repeat the request to that node for higher redundancy. Otherwise, one seed node + // providing incomplete data would stop others to fill the gaps. log.info("DataResponse did not contain all data, so we repeat request until we got all data"); UserThread.runAfter(() -> requestData(nodeAddress, remainingNodeAddresses), 2); - } else { + } else if (!allDataReceived) { + allDataReceived = true; log.warn("\n#################################################################\n" + - "Loading initial data did not complete after 20 repeated requests. \n" + - "#################################################################\n"); + "Loading initial data from {} did not complete after 20 repeated requests. \n" + + "#################################################################\n", nodeAddress); checkNotNull(listener).onDataReceived(); } - } else { - log.info("\n#################################################################\n" + - "Loading initial data completed\n" + - "#################################################################\n"); + } else if (!allDataReceived) { + allDataReceived = true; + log.info("\n\n#################################################################\n" + + "Loading initial data from {} completed\n" + + "#################################################################\n", nodeAddress); checkNotNull(listener).onDataReceived(); } } diff --git a/p2p/src/main/java/bisq/network/p2p/peers/getdata/messages/GetDataResponse.java b/p2p/src/main/java/bisq/network/p2p/peers/getdata/messages/GetDataResponse.java index 459138614d9..4cae802e80d 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/getdata/messages/GetDataResponse.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/getdata/messages/GetDataResponse.java @@ -128,7 +128,7 @@ public static GetDataResponse fromProto(protobuf.GetDataResponse proto, boolean wasTruncated = proto.getWasTruncated(); log.info("\n\n<< Received a GetDataResponse with {} {}\n", Utilities.readableFileSize(proto.getSerializedSize()), - wasTruncated ? " (was truncated)" : ""); + wasTruncated ? " (still data missing)" : " (all data received)"); Set dataSet = proto.getDataSetList().stream() .map(entry -> (ProtectedStorageEntry) resolver.fromProto(entry)).collect(Collectors.toSet()); Set persistableNetworkPayloadSet = proto.getPersistableNetworkPayloadItemsList().stream()