From 2d0506ec01795e57ebf617a0f892cf16088947df Mon Sep 17 00:00:00 2001 From: wubin01 Date: Wed, 30 May 2018 20:42:30 +0800 Subject: [PATCH 1/5] add black user --- .../overlay/discover/NodeStatistics.java | 9 - .../tron/common/overlay/server/Channel.java | 393 +++++++++--------- .../common/overlay/server/ChannelManager.java | 93 +++-- .../overlay/server/HandshakeHandler.java | 2 +- .../tron/common/overlay/server/SyncPool.java | 55 ++- .../server/TronChannelInitializer.java | 7 - 6 files changed, 285 insertions(+), 274 deletions(-) diff --git a/src/main/java/org/tron/common/overlay/discover/NodeStatistics.java b/src/main/java/org/tron/common/overlay/discover/NodeStatistics.java index 5e725f836e4..73ef4f8b285 100644 --- a/src/main/java/org/tron/common/overlay/discover/NodeStatistics.java +++ b/src/main/java/org/tron/common/overlay/discover/NodeStatistics.java @@ -189,15 +189,6 @@ public boolean isReputationPenalized() { return false; } - public boolean isPenalized() { - return tronLastLocalDisconnectReason == ReasonCode.NULL_IDENTITY || - tronLastRemoteDisconnectReason == ReasonCode.NULL_IDENTITY || - tronLastLocalDisconnectReason == ReasonCode.BAD_PROTOCOL || - tronLastRemoteDisconnectReason == ReasonCode.BAD_PROTOCOL || - tronLastLocalDisconnectReason == ReasonCode.SYNC_FAIL || - tronLastRemoteDisconnectReason == ReasonCode.SYNC_FAIL; - } - public void nodeDisconnectedRemote(ReasonCode reason) { lastDisconnectedTime = System.currentTimeMillis(); tronLastRemoteDisconnectReason = reason; diff --git a/src/main/java/org/tron/common/overlay/server/Channel.java b/src/main/java/org/tron/common/overlay/server/Channel.java index 7c0a898cd3e..60f4c3fa1dd 100644 --- a/src/main/java/org/tron/common/overlay/server/Channel.java +++ b/src/main/java/org/tron/common/overlay/server/Channel.java @@ -22,8 +22,10 @@ import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.timeout.ReadTimeoutException; import io.netty.handler.timeout.ReadTimeoutHandler; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Date; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,233 +49,248 @@ @Scope("prototype") public class Channel { - private final static Logger logger = LoggerFactory.getLogger("Channel"); + private final static Logger logger = LoggerFactory.getLogger("Channel"); - @Autowired - protected MessageQueue msgQueue; + @Autowired + protected MessageQueue msgQueue; - @Autowired - private MessageCodec messageCodec; + @Autowired + private MessageCodec messageCodec; - @Autowired - private NodeManager nodeManager; + @Autowired + private NodeManager nodeManager; - @Autowired - private StaticMessages staticMessages; + @Autowired + private StaticMessages staticMessages; - @Autowired - private WireTrafficStats stats; + @Autowired + private WireTrafficStats stats; - @Autowired - private HandshakeHandler handshakeHandler; + @Autowired + private HandshakeHandler handshakeHandler; - @Autowired - private P2pHandler p2pHandler; + @Autowired + private P2pHandler p2pHandler; - @Autowired - private TronHandler tronHandler; + @Autowired + private TronHandler tronHandler; - private ChannelManager channelManager; + private ChannelManager channelManager; - private ChannelHandlerContext ctx; + private ChannelHandlerContext ctx; - private InetSocketAddress inetSocketAddress; + private InetSocketAddress inetSocketAddress; - private Node node; + private Node node; - private long startTime; + private long startTime; - private PeerConnectionDelegate peerDel; + private PeerConnectionDelegate peerDel; - private TronState tronState = TronState.INIT; + private TronState tronState = TronState.INIT; - protected NodeStatistics nodeStatistics; + protected NodeStatistics nodeStatistics; - private boolean isActive; + private boolean isActive; - private volatile boolean isDisconnect; + private volatile boolean isDisconnect; - private String remoteId; + private String remoteId; - private PeerStatistics peerStats = new PeerStatistics(); + private PeerStatistics peerStats = new PeerStatistics(); - public void init(ChannelPipeline pipeline, String remoteId, boolean discoveryMode, - ChannelManager channelManager, PeerConnectionDelegate peerDel) { + public void init(ChannelPipeline pipeline, String remoteId, boolean discoveryMode, + ChannelManager channelManager, PeerConnectionDelegate peerDel) { - this.channelManager = channelManager; + this.channelManager = channelManager; - this.remoteId = remoteId; + this.remoteId = remoteId; - isActive = remoteId != null && !remoteId.isEmpty(); + isActive = remoteId != null && !remoteId.isEmpty(); - //TODO: use config here - pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(60, TimeUnit.SECONDS)); - pipeline.addLast(stats.tcp); - pipeline.addLast("protoPender", new ProtobufVarint32LengthFieldPrepender()); - pipeline.addLast("lengthDecode", new TrxProtobufVarint32FrameDecoder(this)); + //TODO: use config here + pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(60, TimeUnit.SECONDS)); + pipeline.addLast(stats.tcp); + pipeline.addLast("protoPender", new ProtobufVarint32LengthFieldPrepender()); + pipeline.addLast("lengthDecode", new TrxProtobufVarint32FrameDecoder(this)); - //handshake first - pipeline.addLast("handshakeHandler", handshakeHandler); + //handshake first + pipeline.addLast("handshakeHandler", handshakeHandler); - this.peerDel = peerDel; + this.peerDel = peerDel; - messageCodec.setChannel(this); - msgQueue.setChannel(this); - handshakeHandler.setChannel(this, remoteId); - p2pHandler.setChannel(this); - tronHandler.setChannel(this); + messageCodec.setChannel(this); + msgQueue.setChannel(this); + handshakeHandler.setChannel(this, remoteId); + p2pHandler.setChannel(this); + tronHandler.setChannel(this); - p2pHandler.setMsgQueue(msgQueue); - tronHandler.setMsgQueue(msgQueue); - tronHandler.setPeerDel(peerDel); + p2pHandler.setMsgQueue(msgQueue); + tronHandler.setMsgQueue(msgQueue); + tronHandler.setPeerDel(peerDel); - } - - public void publicHandshakeFinished(ChannelHandlerContext ctx, HelloMessage msg) { - ctx.pipeline().remove(handshakeHandler); - msgQueue.activate(ctx); - ctx.pipeline().addLast("messageCodec", messageCodec); - ctx.pipeline().addLast("p2p", p2pHandler); - ctx.pipeline().addLast("data", tronHandler); - setStartTime(msg.getTimestamp()); - setTronState(TronState.HANDSHAKE_FINISHED); - getNodeStatistics().p2pHandShake.add(); - logger.info("Finish handshake with {}.", ctx.channel().remoteAddress()); - } - - /** - * Set node and register it in NodeManager if it is not registered yet. - */ - public void initNode(byte[] nodeId, int remotePort) { - node = new Node(nodeId, inetSocketAddress.getHostString(), remotePort); - nodeStatistics = nodeManager.getNodeStatistics(node); - } - - public void disconnect(ReasonCode reason) { - this.isDisconnect = true; - DisconnectMessage msg = new DisconnectMessage(reason); - logger.info("Send to {}, {}", ctx.channel().remoteAddress(), msg); - getNodeStatistics().nodeDisconnectedLocal(reason); - ctx.writeAndFlush(msg.getSendData()).addListener(future -> close()); - } - - public void processException(Throwable throwable){ - Throwable baseThrowable = throwable; - while (baseThrowable.getCause() != null){ - baseThrowable = baseThrowable.getCause(); - } - String errMsg = throwable.getMessage(); - SocketAddress address = ctx.channel().remoteAddress(); - if (throwable instanceof ReadTimeoutException){ - logger.error("Read timeout, {}", address); - }else if(baseThrowable instanceof P2pException){ - logger.error("type: {}, info: {}, {}", ((P2pException) baseThrowable).getType(), baseThrowable.getMessage(), address); - }else if (errMsg != null && errMsg.contains("Connection reset by peer")){ - logger.error("{}, {}", errMsg, address); - }else { - logger.error("exception caught, {}", address, throwable); - } - close(); - } - - public void close(){ - this.isDisconnect = true; - p2pHandler.close(); - msgQueue.close(); - ctx.close(); - } - - public enum TronState { - INIT, - HANDSHAKE_FINISHED, - START_TO_SYNC, - SYNCING, - SYNC_COMPLETED, - SYNC_FAILED - } - - public PeerStatistics getPeerStats() { - return peerStats; - } - - public Node getNode() { - return node; - } - - public byte[] getNodeId() { - return node == null ? null : node.getId(); - } - - public ByteArrayWrapper getNodeIdWrapper() { - return node == null ? null : new ByteArrayWrapper(node.getId()); - } - - public String getPeerId() { - return node == null ? "" : node.getHexId(); - } - - public void setChannelHandlerContext(ChannelHandlerContext ctx){ - this.ctx = ctx; - this.inetSocketAddress = ctx == null ? null: (InetSocketAddress)ctx.channel().remoteAddress(); - } - - public ChannelHandlerContext getChannelHandlerContext(){ - return this.ctx; - } - - public NodeStatistics getNodeStatistics() { - return nodeStatistics; - } - - public void setStartTime(long startTime) { - this.startTime = startTime; - } + } - public long getStartTime(){ - return startTime; - } + public void publicHandshakeFinished(ChannelHandlerContext ctx, HelloMessage msg) { + ctx.pipeline().remove(handshakeHandler); + msgQueue.activate(ctx); + ctx.pipeline().addLast("messageCodec", messageCodec); + ctx.pipeline().addLast("p2p", p2pHandler); + ctx.pipeline().addLast("data", tronHandler); + setStartTime(msg.getTimestamp()); + setTronState(TronState.HANDSHAKE_FINISHED); + getNodeStatistics().p2pHandShake.add(); + logger.info("Finish handshake with {}.", ctx.channel().remoteAddress()); + } - public void setTronState(TronState tronState) { - this.tronState = tronState; - } + /** + * Set node and register it in NodeManager if it is not registered yet. + */ + public void initNode(byte[] nodeId, int remotePort) { + node = new Node(nodeId, inetSocketAddress.getHostString(), remotePort); + nodeStatistics = nodeManager.getNodeStatistics(node); + } - public TronState getTronState() { - return tronState; - } + public void disconnect(ReasonCode reason) { + this.isDisconnect = true; + channelManager.processDisconnect(this, reason); + DisconnectMessage msg = new DisconnectMessage(reason); + logger.info("Send to {}, {}", ctx.channel().remoteAddress(), msg); + getNodeStatistics().nodeDisconnectedLocal(reason); + ctx.writeAndFlush(msg.getSendData()).addListener(future -> close()); + } - public boolean isActive() { - return isActive; + public void processException(Throwable throwable) { + Throwable baseThrowable = throwable; + while (baseThrowable.getCause() != null) { + baseThrowable = baseThrowable.getCause(); } - - public boolean isDisconnect(){ - return isDisconnect; + String errMsg = throwable.getMessage(); + SocketAddress address = ctx.channel().remoteAddress(); + if (throwable instanceof ReadTimeoutException) { + logger.error("Read timeout, {}", address); + } else if (baseThrowable instanceof P2pException) { + logger.error("type: {}, info: {}, {}", ((P2pException) baseThrowable).getType(), + baseThrowable.getMessage(), address); + } else if (errMsg != null && errMsg.contains("Connection reset by peer")) { + logger.error("{}, {}", errMsg, address); + } else { + logger.error("exception caught, {}", address, throwable); } - - public boolean isProtocolsInitialized() { - return tronState.ordinal() > TronState.INIT.ordinal(); + close(); + } + + public void close() { + this.isDisconnect = true; + p2pHandler.close(); + msgQueue.close(); + ctx.close(); + } + + public enum TronState { + INIT, + HANDSHAKE_FINISHED, + START_TO_SYNC, + SYNCING, + SYNC_COMPLETED, + SYNC_FAILED + } + + public PeerStatistics getPeerStats() { + return peerStats; + } + + public Node getNode() { + return node; + } + + public byte[] getNodeId() { + return node == null ? null : node.getId(); + } + + public ByteArrayWrapper getNodeIdWrapper() { + return node == null ? null : new ByteArrayWrapper(node.getId()); + } + + public String getPeerId() { + return node == null ? "" : node.getHexId(); + } + + public void setChannelHandlerContext(ChannelHandlerContext ctx) { + this.ctx = ctx; + this.inetSocketAddress = ctx == null ? null : (InetSocketAddress) ctx.channel().remoteAddress(); + } + + public ChannelHandlerContext getChannelHandlerContext() { + return this.ctx; + } + + public InetAddress getInetAddress() { + return ctx == null ? null : ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress(); + } + + public NodeStatistics getNodeStatistics() { + return nodeStatistics; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public long getStartTime() { + return startTime; + } + + public void setTronState(TronState tronState) { + this.tronState = tronState; + } + + public TronState getTronState() { + return tronState; + } + + public boolean isActive() { + return isActive; + } + + public boolean isDisconnect() { + return isDisconnect; + } + + public boolean isProtocolsInitialized() { + return tronState.ordinal() > TronState.INIT.ordinal(); + } + + @Override + public boolean equals(Object o) { + + if (this == o) { + return true; } - - @Override - public boolean equals(Object o) { - - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Channel channel = (Channel) o; - if (inetSocketAddress != null ? !inetSocketAddress.equals(channel.inetSocketAddress) : channel.inetSocketAddress != null) return false; - if (node != null ? !node.equals(channel.node) : channel.node != null) return false; - return this == channel; + if (o == null || getClass() != o.getClass()) { + return false; } - - @Override - public int hashCode() { - int result = inetSocketAddress != null ? inetSocketAddress.hashCode() : 0; - result = 31 * result + (node != null ? node.hashCode() : 0); - return result; + Channel channel = (Channel) o; + if (inetSocketAddress != null ? !inetSocketAddress.equals(channel.inetSocketAddress) + : channel.inetSocketAddress != null) { + return false; } - - @Override - public String toString() { - return String.format("%s | %s", inetSocketAddress, getPeerId()); + if (node != null ? !node.equals(channel.node) : channel.node != null) { + return false; } + return this == channel; + } + + @Override + public int hashCode() { + int result = inetSocketAddress != null ? inetSocketAddress.hashCode() : 0; + result = 31 * result + (node != null ? node.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return String.format("%s | %s", inetSocketAddress, getPeerId()); + } } diff --git a/src/main/java/org/tron/common/overlay/server/ChannelManager.java b/src/main/java/org/tron/common/overlay/server/ChannelManager.java index 2bebbd54916..7b63f312304 100644 --- a/src/main/java/org/tron/common/overlay/server/ChannelManager.java +++ b/src/main/java/org/tron/common/overlay/server/ChannelManager.java @@ -19,7 +19,10 @@ import static org.tron.protos.Protocol.ReasonCode.DUPLICATE_PEER; import static org.tron.protos.Protocol.ReasonCode.TOO_MANY_PEERS; +import static org.tron.protos.Protocol.ReasonCode.UNKNOWN; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -30,14 +33,19 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import org.apache.commons.collections4.map.LRUMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.tron.common.overlay.client.PeerClient; +import org.tron.common.overlay.discover.Node; +import org.tron.common.utils.Sha256Hash; import org.tron.core.config.args.Args; import org.tron.core.db.ByteArrayWrapper; +import org.tron.core.net.message.BlockMessage; +import org.tron.core.net.message.TransactionMessage; import org.tron.protos.Protocol.ReasonCode; @@ -50,8 +58,11 @@ public class ChannelManager { private final Map activePeers = new ConcurrentHashMap<>(); - private Map recentlyDisconnected = Collections - .synchronizedMap(new LRUMap(500)); + private Cache badPeers = CacheBuilder.newBuilder().maximumSize(10000) + .expireAfterWrite(1, TimeUnit.HOURS).recordStats().build(); + + private Cache recentlyDisconnected = CacheBuilder.newBuilder().maximumSize(1000) + .expireAfterWrite(30, TimeUnit.SECONDS).recordStats().build(); private Args args = Args.getInstance(); @@ -75,55 +86,53 @@ private ChannelManager(final PeerServer peerServer, final PeerClient peerClient) } } - public Set nodesInUse() { - Set ids = new HashSet<>(); - for (Channel peer : getActivePeers()) { - ids.add(peer.getPeerId()); + public void processDisconnect(Channel channel, ReasonCode reason){ + InetAddress inetAddress = channel.getInetAddress(); + if (inetAddress == null){ + return; + } + switch (reason){ + case FORKED: + case BAD_PROTOCOL: + case BAD_BLOCK: + case INCOMPATIBLE_CHAIN: + case INCOMPATIBLE_PROTOCOL: + badPeers.put(channel.getInetAddress(), reason); + break; + default: + recentlyDisconnected.put(channel.getInetAddress(), reason); + break; } - return ids; - } - - public void disconnect(Channel peer, ReasonCode reason) { - peer.disconnect(reason); - InetSocketAddress socketAddress = (InetSocketAddress) peer.getChannelHandlerContext().channel() - .remoteAddress(); - recentlyDisconnected.put(socketAddress.getAddress(), new Date()); } public void notifyDisconnect(Channel channel) { syncPool.onDisconnect(channel); activePeers.values().remove(channel); - if (channel == null || channel.getChannelHandlerContext() == null - || channel.getChannelHandlerContext().channel() == null) { - return; - } - if (channel.getNodeStatistics() != null) { - channel.getNodeStatistics().notifyDisconnect(); + if (channel != null) { + if (channel.getNodeStatistics() != null) { + channel.getNodeStatistics().notifyDisconnect(); + } + InetAddress inetAddress = channel.getInetAddress(); + if (inetAddress != null && recentlyDisconnected.getIfPresent(inetAddress) != null){ + recentlyDisconnected.put(channel.getInetAddress(), UNKNOWN); + } } - InetSocketAddress socketAddress = (InetSocketAddress) channel.getChannelHandlerContext() - .channel().remoteAddress(); - recentlyDisconnected.put(socketAddress.getAddress(), new Date()); } - public boolean isRecentlyDisconnected(InetAddress peerAddr) { - Date disconnectTime = recentlyDisconnected.get(peerAddr); - if (disconnectTime != null && - System.currentTimeMillis() - disconnectTime.getTime() < inboundConnectionBanTimeout) { - return true; - } else { - recentlyDisconnected.remove(peerAddr); + public synchronized boolean processPeer(Channel peer) { + + if (recentlyDisconnected.getIfPresent(peer) != null){ + logger.info("Peer {} recently disconnected.", peer.getInetAddress()); return false; } - } - public synchronized boolean procPeer(Channel peer) { - if (peer.getNodeStatistics().isPenalized()) { - disconnect(peer, peer.getNodeStatistics().getDisconnectReason()); + if (badPeers.getIfPresent(peer) != null) { + peer.disconnect(peer.getNodeStatistics().getDisconnectReason()); return false; } if (!peer.isActive() && activePeers.size() >= maxActivePeers) { - disconnect(peer, TOO_MANY_PEERS); + peer.disconnect(TOO_MANY_PEERS); return false; } @@ -131,9 +140,9 @@ public synchronized boolean procPeer(Channel peer) { Channel channel = activePeers.get(peer.getNodeIdWrapper()); if (channel.getStartTime() > peer.getStartTime()) { logger.info("Disconnect connection established later, {}", channel.getNode()); - disconnect(channel, DUPLICATE_PEER); + channel.disconnect(DUPLICATE_PEER); } else { - disconnect(peer, DUPLICATE_PEER); + peer.disconnect(DUPLICATE_PEER); return false; } } @@ -143,7 +152,15 @@ public synchronized boolean procPeer(Channel peer) { } public Collection getActivePeers() { - return new ArrayList<>(activePeers.values()); + return activePeers.values(); + } + + public Cache getRecentlyDisconnected(){ + return this.recentlyDisconnected; + } + + public Cache getBadPeers(){ + return this.badPeers; } public void close() { diff --git a/src/main/java/org/tron/common/overlay/server/HandshakeHandler.java b/src/main/java/org/tron/common/overlay/server/HandshakeHandler.java index d65f28b596e..fa84ad9d5d4 100644 --- a/src/main/java/org/tron/common/overlay/server/HandshakeHandler.java +++ b/src/main/java/org/tron/common/overlay/server/HandshakeHandler.java @@ -157,7 +157,7 @@ private void handleHelloMsg(ChannelHandlerContext ctx, HelloMessage msg) { channel.getNodeStatistics().p2pInHello.add(); channel.publicHandshakeFinished(ctx, msg); - if (!channelManager.procPeer(channel)) { + if (!channelManager.processPeer(channel)) { return; } diff --git a/src/main/java/org/tron/common/overlay/server/SyncPool.java b/src/main/java/org/tron/common/overlay/server/SyncPool.java index 9dfad04904a..56c2964555e 100644 --- a/src/main/java/org/tron/common/overlay/server/SyncPool.java +++ b/src/main/java/org/tron/common/overlay/server/SyncPool.java @@ -19,6 +19,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -53,19 +54,18 @@ public class SyncPool { private static final long WORKER_TIMEOUT = 16; private static final double fator = 0.4; - private final List activePeers = Collections.synchronizedList(new ArrayList()); + private final List activePeers = Collections + .synchronizedList(new ArrayList()); private final AtomicInteger passivePeersCount = new AtomicInteger(0); private final AtomicInteger activePeersCount = new AtomicInteger(0); private Cache nodeHandlerCache = CacheBuilder.newBuilder() - .maximumSize(1000).expireAfterWrite(120, TimeUnit.SECONDS).recordStats().build(); + .maximumSize(1000).expireAfterWrite(120, TimeUnit.SECONDS).recordStats().build(); @Autowired private NodeManager nodeManager; @Autowired - private ApplicationContext ctx; - private ChannelManager channelManager; private PeerConnectionDelegate peerDel; @@ -88,8 +88,6 @@ public SyncPool(PeerClient peerClient) { public void init(PeerConnectionDelegate peerDel) { this.peerDel = peerDel; - channelManager = ctx.getBean(ChannelManager.class); - poolLoopExecutor.scheduleWithFixedDelay(() -> { try { fillUp(); @@ -101,15 +99,19 @@ public void init(PeerConnectionDelegate peerDel) { logExecutor.scheduleWithFixedDelay(() -> { try { logActivePeers(); - } catch (Throwable t) {} - }, 10, 10, TimeUnit.SECONDS); + } catch (Throwable t) { + } + }, 30, 10, TimeUnit.SECONDS); } private void fillUp() { int lackSize = (int) (maxActiveNodes * fator) - activePeers.size(); - if(lackSize <= 0) return; + if (lackSize <= 0) { + return; + } - final Set nodesInUse = channelManager.nodesInUse(); + final Set nodesInUse = new HashSet<>(); + channelManager.getActivePeers().forEach(channel -> nodesInUse.add(channel.getPeerId())); nodesInUse.add(nodeManager.getPublicHomeNode().getHexId()); List newNodes = nodeManager.getNodes(new NodeSelector(nodesInUse), lackSize); @@ -127,19 +129,10 @@ public void addActivePeers(PeerConnection p) { synchronized void logActivePeers() { - logger.info("-------- active node {}", nodeManager.dumpActiveNodes().size()); - nodeManager.dumpActiveNodes().forEach(handler -> { - if (handler.getNode().getPort() == 18888) { - logger.info("address: {}:{}, ID:{} {}", - handler.getNode().getHost(), handler.getNode().getPort(), - handler.getNode().getHexIdShort(), handler.getNodeStatistics().toString()); - } - }); - logger.info("-------- active connect channel {}", activePeersCount.get()); logger.info("-------- passive connect channel {}", passivePeersCount.get()); logger.info("-------- all connect channel {}", channelManager.getActivePeers().size()); - for (Channel channel: channelManager.getActivePeers()){ + for (Channel channel : channelManager.getActivePeers()) { logger.info(channel.toString()); } @@ -166,7 +159,7 @@ synchronized void logActivePeers() { public synchronized List getActivePeers() { List peers = Lists.newArrayList(); activePeers.forEach(peer -> { - if (!peer.isDisconnect()){ + if (!peer.isDisconnect()) { peers.add(peer); } }); @@ -194,7 +187,7 @@ public synchronized void onDisconnect(Channel peer) { activePeersCount.decrementAndGet(); } activePeers.remove(peer); - peerDel.onDisconnectPeer((PeerConnection)peer); + peerDel.onDisconnectPeer((PeerConnection) peer); } } @@ -225,24 +218,24 @@ public NodeSelector(Set nodesInUse) { @Override public boolean test(NodeHandler handler) { -// if (!nodeManager.isNodeAlive(handler)){ -// return false; -// } - if (handler.getNode().getHost().equals(nodeManager.getPublicHomeNode().getHost()) && - handler.getNode().getPort() == nodeManager.getPublicHomeNode().getPort()) { + handler.getNode().getPort() == nodeManager.getPublicHomeNode().getPort()) { return false; } - if (channelManager.isRecentlyDisconnected(handler.getInetSocketAddress().getAddress())){ - return false; + InetAddress inetAddress = handler.getInetSocketAddress().getAddress(); + if (channelManager.getRecentlyDisconnected().getIfPresent(inetAddress) != null) { + return false; + } + if (channelManager.getBadPeers().getIfPresent(inetAddress) != null) { + return false; } if (nodesInUse != null && nodesInUse.contains(handler.getNode().getHexId())) { return false; } - if (nodeHandlerCache.getIfPresent(handler) != null){ + if (nodeHandlerCache.getIfPresent(handler) != null) { return false; } @@ -250,7 +243,7 @@ public boolean test(NodeHandler handler) { return false; } - return true; + return true; } } diff --git a/src/main/java/org/tron/common/overlay/server/TronChannelInitializer.java b/src/main/java/org/tron/common/overlay/server/TronChannelInitializer.java index 694e0e5cf69..8aebeda4b52 100644 --- a/src/main/java/org/tron/common/overlay/server/TronChannelInitializer.java +++ b/src/main/java/org/tron/common/overlay/server/TronChannelInitializer.java @@ -60,13 +60,6 @@ public TronChannelInitializer(String remoteId) { @Override public void initChannel(NioSocketChannel ch) throws Exception { try { - if (isInbound() && channelManager.isRecentlyDisconnected(ch.remoteAddress().getAddress())) { - // avoid too frequent connection attempts - logger.info("Drop connection - the same IP was disconnected recently, channel: {}", ch.toString()); - ch.disconnect(); - return; - } - final Channel channel = ctx.getBean(PeerConnection.class); channel.init(ch.pipeline(), remoteId, peerDiscoveryMode, channelManager, p2pNode); From 703518c15e707d1952b5b6c9e0c5ce6ccc6f0280 Mon Sep 17 00:00:00 2001 From: wubin01 Date: Wed, 30 May 2018 20:48:45 +0800 Subject: [PATCH 2/5] mdf recentlyDisconnected --- .../java/org/tron/common/overlay/server/ChannelManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/tron/common/overlay/server/ChannelManager.java b/src/main/java/org/tron/common/overlay/server/ChannelManager.java index 7b63f312304..df9bae8cd5a 100644 --- a/src/main/java/org/tron/common/overlay/server/ChannelManager.java +++ b/src/main/java/org/tron/common/overlay/server/ChannelManager.java @@ -113,7 +113,7 @@ public void notifyDisconnect(Channel channel) { channel.getNodeStatistics().notifyDisconnect(); } InetAddress inetAddress = channel.getInetAddress(); - if (inetAddress != null && recentlyDisconnected.getIfPresent(inetAddress) != null){ + if (inetAddress != null && recentlyDisconnected.getIfPresent(inetAddress) == null){ recentlyDisconnected.put(channel.getInetAddress(), UNKNOWN); } } From d4a98960b0192177502d1dbc3a453d1285a03e2b Mon Sep 17 00:00:00 2001 From: wubin01 Date: Wed, 30 May 2018 21:00:15 +0800 Subject: [PATCH 3/5] mdf sync start time --- src/main/java/org/tron/common/overlay/server/SyncPool.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/tron/common/overlay/server/SyncPool.java b/src/main/java/org/tron/common/overlay/server/SyncPool.java index 56c2964555e..194188bc720 100644 --- a/src/main/java/org/tron/common/overlay/server/SyncPool.java +++ b/src/main/java/org/tron/common/overlay/server/SyncPool.java @@ -51,11 +51,9 @@ public class SyncPool { public static final Logger logger = LoggerFactory.getLogger("SyncPool"); - private static final long WORKER_TIMEOUT = 16; private static final double fator = 0.4; - private final List activePeers = Collections - .synchronizedList(new ArrayList()); + private final List activePeers = Collections.synchronizedList(new ArrayList()); private final AtomicInteger passivePeersCount = new AtomicInteger(0); private final AtomicInteger activePeersCount = new AtomicInteger(0); @@ -94,7 +92,7 @@ public void init(PeerConnectionDelegate peerDel) { } catch (Throwable t) { logger.error("Exception in sync worker", t); } - }, WORKER_TIMEOUT, WORKER_TIMEOUT, TimeUnit.SECONDS); + }, 30, 16, TimeUnit.SECONDS); logExecutor.scheduleWithFixedDelay(() -> { try { From 7bf5bc389bd2d713d7abeea2947119826b8fc128 Mon Sep 17 00:00:00 2001 From: sean-liu55 Date: Thu, 31 May 2018 15:21:44 +0800 Subject: [PATCH 4/5] use new method --- src/main/java/org/tron/core/db/BandwidthProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/tron/core/db/BandwidthProcessor.java b/src/main/java/org/tron/core/db/BandwidthProcessor.java index 54d5372371e..38545ba0a9c 100644 --- a/src/main/java/org/tron/core/db/BandwidthProcessor.java +++ b/src/main/java/org/tron/core/db/BandwidthProcessor.java @@ -126,7 +126,7 @@ private boolean consumeFee(AccountCapsule accountCapsule, long fee) { try { long latestOperationTime = dbManager.getHeadBlockTimeStamp(); accountCapsule.setLatestOperationTime(latestOperationTime); - dbManager.adjustBalance(accountCapsule.createDbKey(), -fee); + dbManager.adjustBalance(accountCapsule, -fee); return true; } catch (BalanceInsufficientException e) { return false; From fe599e8d28f3f5163fdcb14e9fd2ba24f969559e Mon Sep 17 00:00:00 2001 From: wubin01 Date: Thu, 31 May 2018 15:29:28 +0800 Subject: [PATCH 5/5] mdf TcpNetTest.java --- src/test/java/org/tron/core/net/node/TcpNetTest.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/test/java/org/tron/core/net/node/TcpNetTest.java b/src/test/java/org/tron/core/net/node/TcpNetTest.java index b48eeb6de86..d9796c4a676 100644 --- a/src/test/java/org/tron/core/net/node/TcpNetTest.java +++ b/src/test/java/org/tron/core/net/node/TcpNetTest.java @@ -7,6 +7,7 @@ import static org.tron.protos.Protocol.ReasonCode.INCOMPATIBLE_CHAIN; import static org.tron.protos.Protocol.ReasonCode.INCOMPATIBLE_PROTOCOL; +import com.google.common.cache.CacheBuilder; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; @@ -19,6 +20,7 @@ import java.util.Date; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.map.LRUMap; import org.apache.commons.lang3.ArrayUtils; @@ -233,8 +235,8 @@ private void validResultCloseConnect(Channel channel) throws InterruptedExceptio finish = false; channel.close(); Thread.sleep(sleepTime); - ReflectUtils.setFieldValue(channelManager, "recentlyDisconnected", Collections - .synchronizedMap(new LRUMap(500))); + ReflectUtils.setFieldValue(channelManager, "recentlyDisconnected", CacheBuilder.newBuilder().maximumSize(1000) + .expireAfterWrite(30, TimeUnit.SECONDS).recordStats().build()); ReflectUtils.setFieldValue(pool, "activePeers", Collections.synchronizedList(new ArrayList())); ReflectUtils.setFieldValue(channelManager, "activePeers", new ConcurrentHashMap<>()); @@ -252,8 +254,8 @@ private void validResultUnCloseConnect() throws InterruptedException { private void clearConnect(Channel channel) throws InterruptedException { channel.close(); Thread.sleep(sleepTime); - ReflectUtils.setFieldValue(channelManager, "recentlyDisconnected", Collections - .synchronizedMap(new LRUMap(500))); + ReflectUtils.setFieldValue(channelManager, "recentlyDisconnected", CacheBuilder.newBuilder().maximumSize(1000) + .expireAfterWrite(30, TimeUnit.SECONDS).recordStats().build()); ReflectUtils.setFieldValue(pool, "activePeers", Collections.synchronizedList(new ArrayList())); ReflectUtils.setFieldValue(channelManager, "activePeers", new ConcurrentHashMap<>());