Skip to content

Commit

Permalink
Merge pull request #640 from tronprotocol/pp2p_handshake
Browse files Browse the repository at this point in the history
p2p: add handshake
  • Loading branch information
huzhenyuan authored May 7, 2018
2 parents db4ae6f + 7870ec7 commit 8074a47
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 151 deletions.
74 changes: 40 additions & 34 deletions src/main/java/org/tron/common/overlay/message/HelloMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.beans.factory.annotation.Autowired;
import org.tron.common.overlay.discover.Node;
import org.tron.common.utils.ByteArray;
import org.tron.core.capsule.BlockCapsule;
import org.tron.core.config.args.Args;
import org.tron.core.db.Manager;
import org.tron.core.net.message.MessageTypes;
import org.tron.protos.Discover.Endpoint;
import org.tron.protos.Protocol;
Expand Down Expand Up @@ -33,19 +36,38 @@ public HelloMessage(byte type, byte[] rawData) {
/**
* Create hello message.
*/
public HelloMessage(Node from, long timestamp) {
public HelloMessage(Node from, long timestamp, BlockCapsule.BlockId genesisBlockId,
BlockCapsule.BlockId solidBlockId, BlockCapsule.BlockId headBlockId){

Endpoint fromEndpoint = Endpoint.newBuilder()
.setNodeId(ByteString.copyFrom(from.getId()))
.setPort(from.getPort())
.setAddress(ByteString.copyFrom(ByteArray.fromString(from.getHost())))
.build();

Protocol.HelloMessage.BlockId gBlockId = Protocol.HelloMessage.BlockId.newBuilder()
.setHash(genesisBlockId.getByteString())
.setNumber(genesisBlockId.getNum())
.build();

Protocol.HelloMessage.BlockId sBlockId = Protocol.HelloMessage.BlockId.newBuilder()
.setHash(solidBlockId.getByteString())
.setNumber(solidBlockId.getNum())
.build();

Protocol.HelloMessage.BlockId hBlockId = Protocol.HelloMessage.BlockId.newBuilder()
.setHash(headBlockId.getByteString())
.setNumber(headBlockId.getNum())
.build();

Builder builder = Protocol.HelloMessage.newBuilder();

builder.setFrom(fromEndpoint);
builder.setVersion(Args.getInstance().getNodeP2pVersion());
builder.setTimestamp(timestamp);
builder.setGenesisBlockId(gBlockId);
builder.setSolidBlockId(sBlockId);
builder.setHeadBlockId(hBlockId);

this.helloMessage = builder.build();
this.type = MessageTypes.P2P_HELLO.asByte();
Expand Down Expand Up @@ -76,41 +98,25 @@ public long getTimestamp(){
return this.helloMessage.getTimestamp();
}

/**
* Get listen port.
*/
public int getListenPort() {
return this.helloMessage.getFrom().getPort();
public Node getFrom() {
Endpoint from = this.helloMessage.getFrom();
return new Node(from.getNodeId().toByteArray(),
ByteArray.toStr(from.getAddress().toByteArray()), from.getPort());
}

/**
* Get peer ID.
*/
public String getPeerId() {
return ByteArray.toHexString(this.helloMessage.getFrom().getNodeId().toByteArray());
public BlockCapsule.BlockId getGenesisBlockId(){
return new BlockCapsule.BlockId(this.helloMessage.getGenesisBlockId().getHash(),
this.helloMessage.getGenesisBlockId().getNumber());
}

/**
* Set version of p2p protocol.
*/
public void setVersion(byte version) {
Builder builder = this.helloMessage.toBuilder();
builder.setVersion(version);
this.helloMessage = builder.build();
public BlockCapsule.BlockId getSolidBlockId(){
return new BlockCapsule.BlockId(this.helloMessage.getSolidBlockId().getHash(),
this.helloMessage.getSolidBlockId().getNumber());
}

/**
* Get string.
*/
public String toString() {
return helloMessage.toString();
// if (!this.unpacked) {
// this.unPack();
// }
// return "[" + this.getCommand().name() + " p2pVersion="
// + this.getVersion() + " clientId=" + this.getClientId()
// + " peerPort=" + this.getListenPort() + " peerId="
// + this.getPeerId() + "]";
public BlockCapsule.BlockId getHeadBlockId(){
return new BlockCapsule.BlockId(this.helloMessage.getHeadBlockId().getHash(),
this.helloMessage.getHeadBlockId().getNumber());
}

@Override
Expand All @@ -123,9 +129,9 @@ public MessageTypes getType() {
return MessageTypes.fromByte(this.type);
}

public Node getFrom() {
Endpoint from = this.helloMessage.getFrom();
return new Node(from.getNodeId().toByteArray(),
ByteArray.toStr(from.getAddress().toByteArray()), from.getPort());
@Override
public String toString() {
return helloMessage.toString();
}

}
2 changes: 0 additions & 2 deletions src/main/java/org/tron/common/overlay/message/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public String toString() {

public abstract Class<?> getAnswerMessage();

//public byte getCode() { return type; }

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
61 changes: 0 additions & 61 deletions src/main/java/org/tron/common/overlay/message/StaticMessages.java
Original file line number Diff line number Diff line change
@@ -1,70 +1,9 @@
/*
* Copyright (c) [2016] [ <ether.camp> ]
* This file is part of the ethereumJ library.
*
* The ethereumJ library is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* The ethereumJ library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with the ethereumJ library. If not, see <http://www.gnu.org/licenses/>.
*/
package org.tron.common.overlay.message;

import org.spongycastle.util.encoders.Hex;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tron.common.overlay.discover.Node;
import org.tron.core.config.args.Args;

/**
* This class contains static values of messages on the network. These message will always be the
* same and therefore don't need to be created each time.
*
* @author Roman Mandeleil
* @since 13.04.14
*/

@Component
public class StaticMessages {

@Autowired
Args args;

public final static PingMessage PING_MESSAGE = new PingMessage();
public final static PongMessage PONG_MESSAGE = new PongMessage();
public final static DisconnectMessage DISCONNECT_MESSAGE = new DisconnectMessage(
ReasonCode.REQUESTED);

public static final byte[] SYNC_TOKEN = Hex.decode("22400891");

public HelloMessage createHelloMessage(Node node, long timestamp) {
return new HelloMessage(node, timestamp);
}

private String buildHelloAnnouncement() {
return "java-tron";
// String version = config.projectVersion();
// String numberVersion = version;
// Pattern pattern = Pattern.compile("^\\d+(\\.\\d+)*");
// Matcher matcher = pattern.matcher(numberVersion);
// if (matcher.find()) {
// numberVersion = numberVersion.substring(matcher.start(), matcher.end());
// }
// String system = System.getProperty("os.name");
// if (system.contains(" "))
// system = system.substring(0, system.indexOf(" "));
// if (System.getProperty("java.vm.vendor").contains("Android"))
// system = "Android";
// String phrase = config.helloPhrase();
//
// return String.format("Ethereum(J)/v%s/%s/%s/Java/%s", numberVersion, system,
// config.projectVersionModifier().equalsIgnoreCase("release") ? "Release" : "Dev", phrase);
}
}
2 changes: 1 addition & 1 deletion src/main/java/org/tron/common/overlay/server/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void initNode(byte[] nodeId, int remotePort) {
}

public void disconnect(ReasonCode reason) {
logger.info("Send disconnect {}, reason:{}", ctx.channel().remoteAddress(), reason);
logger.info("Send disconnect to {}, reason:{}", ctx.channel().remoteAddress(), reason);
getNodeStatistics().nodeDisconnectedLocal(reason);
ctx.writeAndFlush(new DisconnectMessage(reason).getSendData());
close();
Expand Down
50 changes: 39 additions & 11 deletions src/main/java/org/tron/common/overlay/server/HandshakeHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.handler.codec.ByteToMessageDecoder;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,6 +34,8 @@
import org.tron.common.overlay.message.*;
import org.tron.common.utils.ByteArray;
import org.tron.core.config.args.Args;
import org.tron.core.db.Manager;
import org.tron.core.net.peer.PeerConnection;

import static org.tron.common.overlay.message.StaticMessages.PONG_MESSAGE;

Expand All @@ -48,14 +51,14 @@ public class HandshakeHandler extends ByteToMessageDecoder {

private final NodeManager nodeManager;

private final ChannelManager channelManager;
private Manager manager;

private P2pMessageFactory messageFactory = new P2pMessageFactory();

@Autowired
public HandshakeHandler(final NodeManager nodeManager, final ChannelManager channelManager) {
public HandshakeHandler(final NodeManager nodeManager, final Manager manager) {
this.nodeManager = nodeManager;
this.channelManager = channelManager;
this.manager = manager;
}

@Override
Expand All @@ -73,6 +76,9 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> ou
byte[] encoded = new byte[buffer.readableBytes()];
buffer.readBytes(encoded);
P2pMessage msg = messageFactory.create(encoded);

logger.info("Handshake receive from {}, {}", ctx.channel().remoteAddress(), msg);

switch (msg.getType()) {
case P2P_HELLO:
handleHelloMsg(ctx, (HelloMessage)msg);
Expand Down Expand Up @@ -100,22 +106,44 @@ public void setChannel(Channel channel, String remoteId) {
}

private void sendHelloMsg(ChannelHandlerContext ctx, long time){
ctx.writeAndFlush(new HelloMessage(nodeManager.getPublicHomeNode(), time).getSendData());
HelloMessage message = new HelloMessage(nodeManager.getPublicHomeNode(), time,
manager.getGenesisBlockId(), manager.getSolidBlockId(), manager.getHeadBlockId());
ctx.writeAndFlush(message.getSendData());
channel.getNodeStatistics().p2pOutHello.add();
}

private void handleHelloMsg(ChannelHandlerContext ctx, HelloMessage msg) {
if (remoteId.length != 64) {
channel.initNode(ByteArray.fromHexString(msg.getPeerId()), msg.getListenPort());
if (msg.getVersion() != Args.getInstance().getNodeP2pVersion()) {
logger.info("Peer {} version not support, peer->{}, me->{}",
ctx.channel().remoteAddress(), msg.getVersion(), Args.getInstance().getNodeP2pVersion());
channel.disconnect(ReasonCode.INCOMPATIBLE_PROTOCOL);
return;
}
channel.initNode(msg.getFrom().getId(), msg.getFrom().getPort());
}

if (msg.getVersion() != Args.getInstance().getNodeP2pVersion()) {
logger.info("Peer {} different p2p version, peer->{}, me->{}",
ctx.channel().remoteAddress(), msg.getVersion(), Args.getInstance().getNodeP2pVersion());
channel.disconnect(ReasonCode.INCOMPATIBLE_PROTOCOL);
return;
}

if (!Arrays.equals(manager.getGenesisBlockId().getBytes(), msg.getGenesisBlockId().getBytes())){
logger.info("Peer {} different genesis block, peer->{}, me->{}", ctx.channel().remoteAddress(),
msg.getGenesisBlockId().getString(), manager.getGenesisBlockId().getString());
channel.disconnect(ReasonCode.INCOMPATIBLE_CHAIN);
return;
}

if (manager.getSolidBlockId().getNum() >= msg.getSolidBlockId().getNum() && !manager.containBlockInMainChain(msg.getSolidBlockId())){
logger.info("Peer {} different solid block, peer->{}, me->{}", ctx.channel().remoteAddress(),
msg.getSolidBlockId().getString(), manager.getSolidBlockId().getString());
channel.disconnect(ReasonCode.FORKED);
return;
}

if (remoteId.length != 64) {
sendHelloMsg(ctx, msg.getTimestamp());
}

((PeerConnection)channel).setHelloMessage(msg);

channel.getNodeStatistics().p2pInHello.add();

channel.publicHandshakeFinished(ctx, msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private void send() {
}
if (messageRoundtrip.getRetryTimes() > 0){
channel.getNodeStatistics().nodeDisconnectedLocal(ReasonCode.PING_TIMEOUT);
logger.warn("wait {} timeout. close channel {}.", messageRoundtrip.getMsg().getAnswerMessage(), ctx.channel().remoteAddress());
logger.warn("Wait {} timeout. close channel {}.", messageRoundtrip.getMsg().getAnswerMessage(), ctx.channel().remoteAddress());
channel.close();
return;
}
Expand All @@ -124,8 +124,6 @@ private void send() {

ctx.writeAndFlush(msg.getSendData());

logger.info("send {} to {}", msg.getType(), ctx.channel().remoteAddress());

messageRoundtrip.incRetryTimes();
messageRoundtrip.saveTime();
}
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/org/tron/core/db/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,15 @@ public long getSyncBeginNumber() {
return dynamicPropertiesStore.getLatestBlockHeaderNumber() - revokingStore.size();
}

public BlockId getSolidBlockId() {
try{
long num = dynamicPropertiesStore.getLatestSolidifiedBlockNum();
return getBlockIdByNum(num);
}catch (Exception e){
return getGenesisBlockId();
}
}

/**
* Determine if the current time is maintenance time.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public FetchInvDataMessage(List<Sha256Hash> hashList, InventoryType type) {

@Override
public String toString() {
return super.toString() + ", type=" + MessageTypes.fromByte(this.type);
return super.toString();
}

public MessageTypes getInvType() {
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/org/tron/core/net/message/InventoryMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,14 @@ public InventoryType getInventoryType() {
@Override
public String toString() {
Deque<Sha256Hash> hashes = new LinkedList<>(getHashList());
return getType().toString() + ": First hash: " + hashes.peekFirst()
+ " End hash: " + hashes.peekLast();
StringBuilder builder = new StringBuilder();
builder.append(getType()).append(":").append(getInvMessageType())
.append(", size=").append(hashes.size())
.append(", First hash:").append(hashes.peekFirst());
if (hashes.size() > 1){
builder.append(", End hash: ").append(hashes.peekLast());
}
return builder.toString();
}

private synchronized void unPack() {
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/org/tron/core/net/node/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1117,11 +1117,12 @@ private void syncNextBatchChainIds(PeerConnection peer) {

@Override
public void onConnectPeer(PeerConnection peer) {
//TODO:when use new p2p framework, remove this
logger.info("start sync with::" + peer);
peer.setTronState(TronState.SYNCING);
peer.setConnectTime(Time.getCurrentMillis());
startSyncWithPeer(peer);
if (peer.getHelloMessage().getHeadBlockId().getNum() > del.getHeadBlockId().getNum()){
peer.setTronState(TronState.SYNCING);
startSyncWithPeer(peer);
}else {
peer.setTronState(TronState.SYNC_COMPLETED);
}
}

@Override
Expand Down
Loading

0 comments on commit 8074a47

Please sign in to comment.