diff --git a/src/main/java/org/tron/common/overlay/message/HelloMessage.java b/src/main/java/org/tron/common/overlay/message/HelloMessage.java index 454a9aa702b..d8d7abb3311 100755 --- a/src/main/java/org/tron/common/overlay/message/HelloMessage.java +++ b/src/main/java/org/tron/common/overlay/message/HelloMessage.java @@ -2,9 +2,12 @@ import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import org.springframework.beans.factory.annotation.Autowired; import org.tron.common.overlay.discover.Node; import org.tron.common.utils.ByteArray; +import org.tron.core.capsule.BlockCapsule; import org.tron.core.config.args.Args; +import org.tron.core.db.Manager; import org.tron.core.net.message.MessageTypes; import org.tron.protos.Discover.Endpoint; import org.tron.protos.Protocol; @@ -33,7 +36,8 @@ public HelloMessage(byte type, byte[] rawData) { /** * Create hello message. */ - public HelloMessage(Node from, long timestamp) { + public HelloMessage(Node from, long timestamp, BlockCapsule.BlockId genesisBlockId, + BlockCapsule.BlockId solidBlockId, BlockCapsule.BlockId headBlockId){ Endpoint fromEndpoint = Endpoint.newBuilder() .setNodeId(ByteString.copyFrom(from.getId())) @@ -41,11 +45,29 @@ public HelloMessage(Node from, long timestamp) { .setAddress(ByteString.copyFrom(ByteArray.fromString(from.getHost()))) .build(); + Protocol.HelloMessage.BlockId gBlockId = Protocol.HelloMessage.BlockId.newBuilder() + .setHash(genesisBlockId.getByteString()) + .setNumber(genesisBlockId.getNum()) + .build(); + + Protocol.HelloMessage.BlockId sBlockId = Protocol.HelloMessage.BlockId.newBuilder() + .setHash(solidBlockId.getByteString()) + .setNumber(solidBlockId.getNum()) + .build(); + + Protocol.HelloMessage.BlockId hBlockId = Protocol.HelloMessage.BlockId.newBuilder() + .setHash(headBlockId.getByteString()) + .setNumber(headBlockId.getNum()) + .build(); + Builder builder = Protocol.HelloMessage.newBuilder(); builder.setFrom(fromEndpoint); builder.setVersion(Args.getInstance().getNodeP2pVersion()); builder.setTimestamp(timestamp); + builder.setGenesisBlockId(gBlockId); + builder.setSolidBlockId(sBlockId); + builder.setHeadBlockId(hBlockId); this.helloMessage = builder.build(); this.type = MessageTypes.P2P_HELLO.asByte(); @@ -76,41 +98,25 @@ public long getTimestamp(){ return this.helloMessage.getTimestamp(); } - /** - * Get listen port. - */ - public int getListenPort() { - return this.helloMessage.getFrom().getPort(); + public Node getFrom() { + Endpoint from = this.helloMessage.getFrom(); + return new Node(from.getNodeId().toByteArray(), + ByteArray.toStr(from.getAddress().toByteArray()), from.getPort()); } - /** - * Get peer ID. - */ - public String getPeerId() { - return ByteArray.toHexString(this.helloMessage.getFrom().getNodeId().toByteArray()); + public BlockCapsule.BlockId getGenesisBlockId(){ + return new BlockCapsule.BlockId(this.helloMessage.getGenesisBlockId().getHash(), + this.helloMessage.getGenesisBlockId().getNumber()); } - /** - * Set version of p2p protocol. - */ - public void setVersion(byte version) { - Builder builder = this.helloMessage.toBuilder(); - builder.setVersion(version); - this.helloMessage = builder.build(); + public BlockCapsule.BlockId getSolidBlockId(){ + return new BlockCapsule.BlockId(this.helloMessage.getSolidBlockId().getHash(), + this.helloMessage.getSolidBlockId().getNumber()); } - /** - * Get string. - */ - public String toString() { - return helloMessage.toString(); -// if (!this.unpacked) { -// this.unPack(); -// } -// return "[" + this.getCommand().name() + " p2pVersion=" -// + this.getVersion() + " clientId=" + this.getClientId() -// + " peerPort=" + this.getListenPort() + " peerId=" -// + this.getPeerId() + "]"; + public BlockCapsule.BlockId getHeadBlockId(){ + return new BlockCapsule.BlockId(this.helloMessage.getHeadBlockId().getHash(), + this.helloMessage.getHeadBlockId().getNumber()); } @Override @@ -123,9 +129,9 @@ public MessageTypes getType() { return MessageTypes.fromByte(this.type); } - public Node getFrom() { - Endpoint from = this.helloMessage.getFrom(); - return new Node(from.getNodeId().toByteArray(), - ByteArray.toStr(from.getAddress().toByteArray()), from.getPort()); + @Override + public String toString() { + return helloMessage.toString(); } + } \ No newline at end of file diff --git a/src/main/java/org/tron/common/overlay/message/Message.java b/src/main/java/org/tron/common/overlay/message/Message.java index d8ad889300d..e3a6b50ae55 100644 --- a/src/main/java/org/tron/common/overlay/message/Message.java +++ b/src/main/java/org/tron/common/overlay/message/Message.java @@ -49,8 +49,6 @@ public String toString() { public abstract Class getAnswerMessage(); - //public byte getCode() { return type; } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/src/main/java/org/tron/common/overlay/message/StaticMessages.java b/src/main/java/org/tron/common/overlay/message/StaticMessages.java index f562da44744..8f3cef03144 100644 --- a/src/main/java/org/tron/common/overlay/message/StaticMessages.java +++ b/src/main/java/org/tron/common/overlay/message/StaticMessages.java @@ -1,70 +1,9 @@ -/* - * Copyright (c) [2016] [ ] - * This file is part of the ethereumJ library. - * - * The ethereumJ library is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * The ethereumJ library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with the ethereumJ library. If not, see . - */ package org.tron.common.overlay.message; -import org.spongycastle.util.encoders.Hex; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import org.tron.common.overlay.discover.Node; -import org.tron.core.config.args.Args; - -/** - * This class contains static values of messages on the network. These message will always be the - * same and therefore don't need to be created each time. - * - * @author Roman Mandeleil - * @since 13.04.14 - */ @Component public class StaticMessages { - - @Autowired - Args args; - public final static PingMessage PING_MESSAGE = new PingMessage(); public final static PongMessage PONG_MESSAGE = new PongMessage(); - public final static DisconnectMessage DISCONNECT_MESSAGE = new DisconnectMessage( - ReasonCode.REQUESTED); - - public static final byte[] SYNC_TOKEN = Hex.decode("22400891"); - - public HelloMessage createHelloMessage(Node node, long timestamp) { - return new HelloMessage(node, timestamp); - } - - private String buildHelloAnnouncement() { - return "java-tron"; -// String version = config.projectVersion(); -// String numberVersion = version; -// Pattern pattern = Pattern.compile("^\\d+(\\.\\d+)*"); -// Matcher matcher = pattern.matcher(numberVersion); -// if (matcher.find()) { -// numberVersion = numberVersion.substring(matcher.start(), matcher.end()); -// } -// String system = System.getProperty("os.name"); -// if (system.contains(" ")) -// system = system.substring(0, system.indexOf(" ")); -// if (System.getProperty("java.vm.vendor").contains("Android")) -// system = "Android"; -// String phrase = config.helloPhrase(); -// -// return String.format("Ethereum(J)/v%s/%s/%s/Java/%s", numberVersion, system, -// config.projectVersionModifier().equalsIgnoreCase("release") ? "Release" : "Dev", phrase); - } } diff --git a/src/main/java/org/tron/common/overlay/server/Channel.java b/src/main/java/org/tron/common/overlay/server/Channel.java index 1cba75f8fdc..42e2569ee22 100644 --- a/src/main/java/org/tron/common/overlay/server/Channel.java +++ b/src/main/java/org/tron/common/overlay/server/Channel.java @@ -146,7 +146,7 @@ public void initNode(byte[] nodeId, int remotePort) { } public void disconnect(ReasonCode reason) { - logger.info("Send disconnect {}, reason:{}", ctx.channel().remoteAddress(), reason); + logger.info("Send disconnect to {}, reason:{}", ctx.channel().remoteAddress(), reason); getNodeStatistics().nodeDisconnectedLocal(reason); ctx.writeAndFlush(new DisconnectMessage(reason).getSendData()); close(); diff --git a/src/main/java/org/tron/common/overlay/server/HandshakeHandler.java b/src/main/java/org/tron/common/overlay/server/HandshakeHandler.java index 6a49d42678f..b9721c9b65a 100644 --- a/src/main/java/org/tron/common/overlay/server/HandshakeHandler.java +++ b/src/main/java/org/tron/common/overlay/server/HandshakeHandler.java @@ -22,6 +22,7 @@ import io.netty.handler.codec.ByteToMessageDecoder; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Arrays; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +34,8 @@ import org.tron.common.overlay.message.*; import org.tron.common.utils.ByteArray; import org.tron.core.config.args.Args; +import org.tron.core.db.Manager; +import org.tron.core.net.peer.PeerConnection; import static org.tron.common.overlay.message.StaticMessages.PONG_MESSAGE; @@ -48,14 +51,14 @@ public class HandshakeHandler extends ByteToMessageDecoder { private final NodeManager nodeManager; - private final ChannelManager channelManager; + private Manager manager; private P2pMessageFactory messageFactory = new P2pMessageFactory(); @Autowired - public HandshakeHandler(final NodeManager nodeManager, final ChannelManager channelManager) { + public HandshakeHandler(final NodeManager nodeManager, final Manager manager) { this.nodeManager = nodeManager; - this.channelManager = channelManager; + this.manager = manager; } @Override @@ -73,6 +76,9 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List ou byte[] encoded = new byte[buffer.readableBytes()]; buffer.readBytes(encoded); P2pMessage msg = messageFactory.create(encoded); + + logger.info("Handshake receive from {}, {}", ctx.channel().remoteAddress(), msg); + switch (msg.getType()) { case P2P_HELLO: handleHelloMsg(ctx, (HelloMessage)msg); @@ -100,22 +106,44 @@ public void setChannel(Channel channel, String remoteId) { } private void sendHelloMsg(ChannelHandlerContext ctx, long time){ - ctx.writeAndFlush(new HelloMessage(nodeManager.getPublicHomeNode(), time).getSendData()); + HelloMessage message = new HelloMessage(nodeManager.getPublicHomeNode(), time, + manager.getGenesisBlockId(), manager.getSolidBlockId(), manager.getHeadBlockId()); + ctx.writeAndFlush(message.getSendData()); channel.getNodeStatistics().p2pOutHello.add(); } private void handleHelloMsg(ChannelHandlerContext ctx, HelloMessage msg) { if (remoteId.length != 64) { - channel.initNode(ByteArray.fromHexString(msg.getPeerId()), msg.getListenPort()); - if (msg.getVersion() != Args.getInstance().getNodeP2pVersion()) { - logger.info("Peer {} version not support, peer->{}, me->{}", - ctx.channel().remoteAddress(), msg.getVersion(), Args.getInstance().getNodeP2pVersion()); - channel.disconnect(ReasonCode.INCOMPATIBLE_PROTOCOL); - return; - } + channel.initNode(msg.getFrom().getId(), msg.getFrom().getPort()); + } + + if (msg.getVersion() != Args.getInstance().getNodeP2pVersion()) { + logger.info("Peer {} different p2p version, peer->{}, me->{}", + ctx.channel().remoteAddress(), msg.getVersion(), Args.getInstance().getNodeP2pVersion()); + channel.disconnect(ReasonCode.INCOMPATIBLE_PROTOCOL); + return; + } + + if (!Arrays.equals(manager.getGenesisBlockId().getBytes(), msg.getGenesisBlockId().getBytes())){ + logger.info("Peer {} different genesis block, peer->{}, me->{}", ctx.channel().remoteAddress(), + msg.getGenesisBlockId().getString(), manager.getGenesisBlockId().getString()); + channel.disconnect(ReasonCode.INCOMPATIBLE_CHAIN); + return; + } + + if (manager.getSolidBlockId().getNum() >= msg.getSolidBlockId().getNum() && !manager.containBlockInMainChain(msg.getSolidBlockId())){ + logger.info("Peer {} different solid block, peer->{}, me->{}", ctx.channel().remoteAddress(), + msg.getSolidBlockId().getString(), manager.getSolidBlockId().getString()); + channel.disconnect(ReasonCode.FORKED); + return; + } + + if (remoteId.length != 64) { sendHelloMsg(ctx, msg.getTimestamp()); } + ((PeerConnection)channel).setHelloMessage(msg); + channel.getNodeStatistics().p2pInHello.add(); channel.publicHandshakeFinished(ctx, msg); diff --git a/src/main/java/org/tron/common/overlay/server/MessageQueue.java b/src/main/java/org/tron/common/overlay/server/MessageQueue.java index 5f6f034941a..de25643a107 100644 --- a/src/main/java/org/tron/common/overlay/server/MessageQueue.java +++ b/src/main/java/org/tron/common/overlay/server/MessageQueue.java @@ -115,7 +115,7 @@ private void send() { } if (messageRoundtrip.getRetryTimes() > 0){ channel.getNodeStatistics().nodeDisconnectedLocal(ReasonCode.PING_TIMEOUT); - logger.warn("wait {} timeout. close channel {}.", messageRoundtrip.getMsg().getAnswerMessage(), ctx.channel().remoteAddress()); + logger.warn("Wait {} timeout. close channel {}.", messageRoundtrip.getMsg().getAnswerMessage(), ctx.channel().remoteAddress()); channel.close(); return; } @@ -124,8 +124,6 @@ private void send() { ctx.writeAndFlush(msg.getSendData()); - logger.info("send {} to {}", msg.getType(), ctx.channel().remoteAddress()); - messageRoundtrip.incRetryTimes(); messageRoundtrip.saveTime(); } diff --git a/src/main/java/org/tron/core/db/Manager.java b/src/main/java/org/tron/core/db/Manager.java index dfcd852176b..3ea2e7af3f0 100644 --- a/src/main/java/org/tron/core/db/Manager.java +++ b/src/main/java/org/tron/core/db/Manager.java @@ -1008,6 +1008,15 @@ public long getSyncBeginNumber() { return dynamicPropertiesStore.getLatestBlockHeaderNumber() - revokingStore.size(); } + public BlockId getSolidBlockId() { + try{ + long num = dynamicPropertiesStore.getLatestSolidifiedBlockNum(); + return getBlockIdByNum(num); + }catch (Exception e){ + return getGenesisBlockId(); + } + } + /** * Determine if the current time is maintenance time. */ diff --git a/src/main/java/org/tron/core/net/message/FetchInvDataMessage.java b/src/main/java/org/tron/core/net/message/FetchInvDataMessage.java index f2345142e8f..5891f3a8681 100644 --- a/src/main/java/org/tron/core/net/message/FetchInvDataMessage.java +++ b/src/main/java/org/tron/core/net/message/FetchInvDataMessage.java @@ -33,7 +33,7 @@ public FetchInvDataMessage(List hashList, InventoryType type) { @Override public String toString() { - return super.toString() + ", type=" + MessageTypes.fromByte(this.type); + return super.toString(); } public MessageTypes getInvType() { diff --git a/src/main/java/org/tron/core/net/message/InventoryMessage.java b/src/main/java/org/tron/core/net/message/InventoryMessage.java index af6d5a047c3..ff0542e1024 100644 --- a/src/main/java/org/tron/core/net/message/InventoryMessage.java +++ b/src/main/java/org/tron/core/net/message/InventoryMessage.java @@ -78,8 +78,14 @@ public InventoryType getInventoryType() { @Override public String toString() { Deque hashes = new LinkedList<>(getHashList()); - return getType().toString() + ": First hash: " + hashes.peekFirst() - + " End hash: " + hashes.peekLast(); + StringBuilder builder = new StringBuilder(); + builder.append(getType()).append(":").append(getInvMessageType()) + .append(", size=").append(hashes.size()) + .append(", First hash:").append(hashes.peekFirst()); + if (hashes.size() > 1){ + builder.append(", End hash: ").append(hashes.peekLast()); + } + return builder.toString(); } private synchronized void unPack() { diff --git a/src/main/java/org/tron/core/net/node/NodeImpl.java b/src/main/java/org/tron/core/net/node/NodeImpl.java index 7d9de8549ec..552a9fa8828 100644 --- a/src/main/java/org/tron/core/net/node/NodeImpl.java +++ b/src/main/java/org/tron/core/net/node/NodeImpl.java @@ -1117,11 +1117,12 @@ private void syncNextBatchChainIds(PeerConnection peer) { @Override public void onConnectPeer(PeerConnection peer) { - //TODO:when use new p2p framework, remove this - logger.info("start sync with::" + peer); - peer.setTronState(TronState.SYNCING); - peer.setConnectTime(Time.getCurrentMillis()); - startSyncWithPeer(peer); + if (peer.getHelloMessage().getHeadBlockId().getNum() > del.getHeadBlockId().getNum()){ + peer.setTronState(TronState.SYNCING); + startSyncWithPeer(peer); + }else { + peer.setTronState(TronState.SYNC_COMPLETED); + } } @Override diff --git a/src/main/java/org/tron/core/net/peer/PeerConnection.java b/src/main/java/org/tron/core/net/peer/PeerConnection.java index 8940c14b7a6..51f167eb14c 100644 --- a/src/main/java/org/tron/core/net/peer/PeerConnection.java +++ b/src/main/java/org/tron/core/net/peer/PeerConnection.java @@ -15,6 +15,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import org.tron.common.overlay.message.HelloMessage; import org.tron.common.overlay.message.Message; import org.tron.common.overlay.server.Channel; import org.tron.common.utils.Sha256Hash; @@ -29,23 +30,10 @@ @Scope("prototype") public class PeerConnection extends Channel { - @Override - public int hashCode() { - return super.hashCode(); - } - - public long getConnectTime() { - return connectTime; - } - - public void setConnectTime(long connectTime) { - this.connectTime = connectTime; - } - - private long connectTime; - private boolean syncFlag = true; + private HelloMessage helloMessage; + //broadcast private Queue invToUs = new LinkedBlockingQueue<>(); @@ -136,6 +124,13 @@ public void setAdvObjWeRequested(ConcurrentHashMap advObjWeReq this.advObjWeRequested = advObjWeRequested; } + public void setHelloMessage(HelloMessage helloMessage) { + this.helloMessage = helloMessage; + } + + public HelloMessage getHelloMessage(){ + return this.helloMessage; + } public void cleanInvGarbage() { long oldestTimestamp = @@ -230,9 +225,6 @@ public void setSyncFlag(boolean syncFlag){ } public String logSyncStats() { - //TODO: return tron sync status here. -// int waitResp = lastReqSentTime > 0 ? (int) (System.currentTimeMillis() - lastReqSentTime) / 1000 : 0; -// long lifeTime = System.currentTimeMillis() - connectedTime; return String.format( "Peer %s: [ %18s, ping %6s ms]-----------\n" + "connect time: %s\n" @@ -248,7 +240,7 @@ public String logSyncStats() { this.getNode().getHost() + ":" + this.getNode().getPort(), this.getNode().getHexIdShort(), (int) this.getPeerStats().getAvgLatency(), - Time.getTimeString(getConnectTime()), + Time.getTimeString(super.getStartTime()), headBlockWeBothHave.getNum(), isNeedSyncFromPeer(), isNeedSyncFromUs(), @@ -272,16 +264,7 @@ public boolean idle() { } public void sendMessage(Message message) { - if (!(message instanceof BlockMessage) - && !(message instanceof TransactionMessage)) { - logger.info("Send Message:" + message.toString() + " to\n" + this); - } msgQueue.sendMessage(message); nodeStatistics.tronOutMessage.add(); } - - @Override - public String toString() { - return super.toString();// nodeStatistics.toString(); - } } diff --git a/src/main/java/org/tron/core/net/peer/PeerConnectionDelegate.java b/src/main/java/org/tron/core/net/peer/PeerConnectionDelegate.java index 7de713fb471..2182f575858 100644 --- a/src/main/java/org/tron/core/net/peer/PeerConnectionDelegate.java +++ b/src/main/java/org/tron/core/net/peer/PeerConnectionDelegate.java @@ -14,7 +14,4 @@ public abstract class PeerConnectionDelegate { public abstract void onDisconnectPeer(PeerConnection peer); - - //public abstract gvoid onConnectionClosed(PeerConnection peer); - } diff --git a/src/main/protos/core/Tron.proto b/src/main/protos/core/Tron.proto index 888635e3971..567f106f075 100644 --- a/src/main/protos/core/Tron.proto +++ b/src/main/protos/core/Tron.proto @@ -270,7 +270,15 @@ message DisconnectMessage { } message HelloMessage { + message BlockId { + bytes hash = 1; + int64 number = 2; + } + Endpoint from = 1; int32 version = 2; int64 timestamp = 3; + BlockId genesisBlockId = 4; + BlockId solidBlockId = 5; + BlockId headBlockId = 6; }