From ef7c93b73ae78353e26694518ff28f926e9f83ad Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 25 Oct 2024 17:45:14 -0400 Subject: [PATCH] network: handle p2p to ws messages propagation --- data/txHandler.go | 3 + node/node_test.go | 147 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 149 insertions(+), 1 deletion(-) diff --git a/data/txHandler.go b/data/txHandler.go index ec3a84cc1f..e156417f79 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -858,6 +858,9 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa if err != nil { logging.Base().Infof("unable to pin transaction: %v", err) } + + handler.net.Relay(handler.ctx, protocol.TxnTag, reencoded, false, wi.rawmsg.Sender) + return network.OutgoingMessage{ Action: network.Accept, } diff --git a/node/node_test.go b/node/node_test.go index 387ea58aa8..8cee3f9a11 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -861,7 +861,7 @@ func TestMaxSizesCorrect(t *testing.T) { // N -- R -- A and ensures N can discover A and download blocks from it. // // N is a non-part node that joins the network later -// R is a non-archival relay node with block service disabled. It MUST NOT service blocks to force N to discover A. +// R is a non-archival relay node with block service disabled. It MUST NOT serve blocks to force N to discover A. // A is a archival node that can only provide blocks. // Nodes N and A have only R in their initial phonebook, and all nodes are in hybrid mode. func TestNodeHybridTopology(t *testing.T) { @@ -1112,3 +1112,148 @@ func TestNodeSetCatchpointCatchupMode(t *testing.T) { }) } } + +// TestNodeHybridP2PGossipSend set ups 3 nodes network with the following topology: +// N0 -- R -- N2 where N0 is wsnet only, R is a relay hybrid node, and N2 is p2pnet only. +// +// N0 is the only blocks producer, and N2 is the only transaction supplier. +// Test ensures that a hybrid R relay can properly deliver transactions to N0. +func TestNodeHybridP2PGossipSend(t *testing.T) { + partitiontest.PartitionTest(t) + + const consensusTest0 = protocol.ConsensusVersion("test0") + + configurableConsensus := make(config.ConsensusProtocols) + + testParams0 := config.Consensus[protocol.ConsensusCurrentVersion] + testParams0.AgreementFilterTimeoutPeriod0 = 500 * time.Millisecond + configurableConsensus[consensusTest0] = testParams0 + + // configure the stake to have R and A producing and confirming blocks + const totalStake = 100_000_000_000 + const npnStake = 1_000_000 + const nodeStake = totalStake - npnStake + const numAccounts = 3 + acctStake := make([]basics.MicroAlgos, numAccounts) + acctStake[0] = basics.MicroAlgos{Raw: nodeStake} + acctStake[1] = basics.MicroAlgos{} + acctStake[2] = basics.MicroAlgos{Raw: npnStake} + + configHook := func(ni nodeInfo, cfg config.Local) (nodeInfo, config.Local) { + cfg = config.GetDefaultLocal() + if ni.idx != 1 { + cfg.EnableBlockService = false + cfg.EnableGossipBlockService = false + cfg.EnableLedgerService = false + cfg.CatchpointInterval = 0 + cfg.Archival = false + cfg.NetAddress = "" + } else { + // node 1 is archival + cfg.EnableBlockService = true + cfg.EnableGossipBlockService = true + cfg.NetAddress = ni.wsNetAddr() + cfg.EnableP2PHybridMode = true + cfg.PublicAddress = ni.wsNetAddr() + cfg.P2PPersistPeerID = true + privKey, err := p2p.GetPrivKey(cfg, ni.rootDir) + require.NoError(t, err) + ni.p2pID, err = p2p.PeerIDFromPublicKey(privKey.GetPublic()) + require.NoError(t, err) + + cfg.P2PHybridNetAddress = ni.p2pNetAddr() + } + if ni.idx == 2 { + // node 2 is p2p only + cfg.EnableP2PHybridMode = false + cfg.EnableP2P = true + } + return ni, cfg + } + + phonebookHook := func(ni []nodeInfo, i int) []string { + switch i { + case 0: + // node 0 (N0) connects to R + t.Logf("Node%d phonebook: %s, %s", i, ni[1].wsNetAddr(), ni[1].p2pMultiAddr()) + return []string{ni[1].wsNetAddr(), ni[1].p2pMultiAddr()} + case 1: + // node 1 (R) is a relay accepting connections from all + t.Logf("Node%d phonebook: empty", i) + return []string{} + case 2: + // node 2 (A) connects to R + t.Logf("Node%d phonebook: %s, %s", i, ni[1].wsNetAddr(), ni[1].p2pMultiAddr()) + return []string{ni[1].wsNetAddr(), ni[1].p2pMultiAddr()} + default: + t.Errorf("not expected number of nodes: %d", i) + t.FailNow() + } + return nil + } + + nodes, wallets := setupFullNodesEx(t, consensusTest0, configurableConsensus, acctStake, configHook, phonebookHook) + require.Len(t, nodes, 3) + require.Len(t, wallets, 3) + for i := 0; i < len(nodes); i++ { + defer os.Remove(wallets[i]) + defer nodes[i].Stop() + } + + startAndConnectNodes(nodes, nodelayFirstNodeStartDelay) + + // ensure the initial connectivity topology + require.Eventually(t, func() bool { + node0Conn := len(nodes[0].net.GetPeers(network.PeersConnectedOut)) > 0 // connected to 1 + node1Conn := len(nodes[1].net.GetPeers(network.PeersConnectedOut, network.PeersConnectedIn)) == 2 // connected from 0 and 2 + node2Conn := len(nodes[2].net.GetPeers(network.PeersConnectedOut)) > 0 // connected to 1 + return node0Conn && node1Conn && node2Conn + }, 60*time.Second, 500*time.Millisecond) + + filename := filepath.Join(nodes[2].genesisDirs.RootGenesisDir, wallets[2]) + access, err := db.MakeAccessor(filename, false, false) + require.NoError(t, err) + root, err := account.RestoreRoot(access) + access.Close() + require.NoError(t, err) + + addr2 := root.Address() + secrets2 := root.Secrets() + + txn := transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: addr2, + FirstValid: 1, + LastValid: 100, + Fee: basics.MicroAlgos{Raw: 1000}, + GenesisID: nodes[2].genesisID, + GenesisHash: nodes[2].genesisHash, + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: addr2, + Amount: basics.MicroAlgos{Raw: 0}, + }, + } + signature := secrets2.Sign(txn) + stxn := transactions.SignedTxn{ + Sig: signature, + Txn: txn, + } + + err = nodes[2].BroadcastSignedTxGroup([]transactions.SignedTxn{stxn}) + require.NoError(t, err) + + initialRound := nodes[0].ledger.NextRound() + targetRound := initialRound + 10 + + // ensure discovery of archival node by tracking its ledger + select { + case <-nodes[2].ledger.Wait(targetRound): + b, err := nodes[0].ledger.Block(targetRound) + require.NoError(t, err) + require.Greater(t, b.TxnCounter, uint64(1000)) // new initial value after AppForbidLowResources + case <-time.After(3 * time.Minute): // set it to 1.5x of the dht.periodicBootstrapInterval to give DHT code to rebuild routing table one more time + require.Fail(t, fmt.Sprintf("no block notification for wallet: %v.", wallets[0])) + } +}