Skip to content

Commit

Permalink
network: handle p2p to ws messages propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Oct 25, 2024
1 parent 39f7485 commit ef7c93b
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 1 deletion.
3 changes: 3 additions & 0 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,9 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa
if err != nil {
logging.Base().Infof("unable to pin transaction: %v", err)
}

handler.net.Relay(handler.ctx, protocol.TxnTag, reencoded, false, wi.rawmsg.Sender)

Check failure on line 862 in data/txHandler.go

View workflow job for this annotation

GitHub Actions / reviewdog-errors

[Lint Errors] reported by reviewdog 🐶 Error return value of `handler.net.Relay` is not checked (errcheck) Raw Output: data/txHandler.go:862:19: Error return value of `handler.net.Relay` is not checked (errcheck) handler.net.Relay(handler.ctx, protocol.TxnTag, reencoded, false, wi.rawmsg.Sender) ^

return network.OutgoingMessage{
Action: network.Accept,
}
Expand Down
147 changes: 146 additions & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func TestMaxSizesCorrect(t *testing.T) {
// N -- R -- A and ensures N can discover A and download blocks from it.
//
// N is a non-part node that joins the network later
// R is a non-archival relay node with block service disabled. It MUST NOT service blocks to force N to discover A.
// R is a non-archival relay node with block service disabled. It MUST NOT serve blocks to force N to discover A.
// A is a archival node that can only provide blocks.
// Nodes N and A have only R in their initial phonebook, and all nodes are in hybrid mode.
func TestNodeHybridTopology(t *testing.T) {
Expand Down Expand Up @@ -1112,3 +1112,148 @@ func TestNodeSetCatchpointCatchupMode(t *testing.T) {
})
}
}

// TestNodeHybridP2PGossipSend set ups 3 nodes network with the following topology:
// N0 -- R -- N2 where N0 is wsnet only, R is a relay hybrid node, and N2 is p2pnet only.
//
// N0 is the only blocks producer, and N2 is the only transaction supplier.
// Test ensures that a hybrid R relay can properly deliver transactions to N0.
func TestNodeHybridP2PGossipSend(t *testing.T) {
partitiontest.PartitionTest(t)

const consensusTest0 = protocol.ConsensusVersion("test0")

configurableConsensus := make(config.ConsensusProtocols)

testParams0 := config.Consensus[protocol.ConsensusCurrentVersion]
testParams0.AgreementFilterTimeoutPeriod0 = 500 * time.Millisecond
configurableConsensus[consensusTest0] = testParams0

// configure the stake to have R and A producing and confirming blocks
const totalStake = 100_000_000_000
const npnStake = 1_000_000
const nodeStake = totalStake - npnStake
const numAccounts = 3
acctStake := make([]basics.MicroAlgos, numAccounts)
acctStake[0] = basics.MicroAlgos{Raw: nodeStake}
acctStake[1] = basics.MicroAlgos{}
acctStake[2] = basics.MicroAlgos{Raw: npnStake}

configHook := func(ni nodeInfo, cfg config.Local) (nodeInfo, config.Local) {
cfg = config.GetDefaultLocal()
if ni.idx != 1 {
cfg.EnableBlockService = false
cfg.EnableGossipBlockService = false
cfg.EnableLedgerService = false
cfg.CatchpointInterval = 0
cfg.Archival = false
cfg.NetAddress = ""
} else {
// node 1 is archival
cfg.EnableBlockService = true
cfg.EnableGossipBlockService = true
cfg.NetAddress = ni.wsNetAddr()
cfg.EnableP2PHybridMode = true
cfg.PublicAddress = ni.wsNetAddr()
cfg.P2PPersistPeerID = true
privKey, err := p2p.GetPrivKey(cfg, ni.rootDir)
require.NoError(t, err)
ni.p2pID, err = p2p.PeerIDFromPublicKey(privKey.GetPublic())
require.NoError(t, err)

cfg.P2PHybridNetAddress = ni.p2pNetAddr()
}
if ni.idx == 2 {
// node 2 is p2p only
cfg.EnableP2PHybridMode = false
cfg.EnableP2P = true
}
return ni, cfg
}

phonebookHook := func(ni []nodeInfo, i int) []string {
switch i {
case 0:
// node 0 (N0) connects to R
t.Logf("Node%d phonebook: %s, %s", i, ni[1].wsNetAddr(), ni[1].p2pMultiAddr())
return []string{ni[1].wsNetAddr(), ni[1].p2pMultiAddr()}
case 1:
// node 1 (R) is a relay accepting connections from all
t.Logf("Node%d phonebook: empty", i)
return []string{}
case 2:
// node 2 (A) connects to R
t.Logf("Node%d phonebook: %s, %s", i, ni[1].wsNetAddr(), ni[1].p2pMultiAddr())
return []string{ni[1].wsNetAddr(), ni[1].p2pMultiAddr()}
default:
t.Errorf("not expected number of nodes: %d", i)
t.FailNow()
}
return nil
}

nodes, wallets := setupFullNodesEx(t, consensusTest0, configurableConsensus, acctStake, configHook, phonebookHook)
require.Len(t, nodes, 3)
require.Len(t, wallets, 3)
for i := 0; i < len(nodes); i++ {
defer os.Remove(wallets[i])
defer nodes[i].Stop()
}

startAndConnectNodes(nodes, nodelayFirstNodeStartDelay)

// ensure the initial connectivity topology
require.Eventually(t, func() bool {
node0Conn := len(nodes[0].net.GetPeers(network.PeersConnectedOut)) > 0 // connected to 1
node1Conn := len(nodes[1].net.GetPeers(network.PeersConnectedOut, network.PeersConnectedIn)) == 2 // connected from 0 and 2
node2Conn := len(nodes[2].net.GetPeers(network.PeersConnectedOut)) > 0 // connected to 1
return node0Conn && node1Conn && node2Conn
}, 60*time.Second, 500*time.Millisecond)

filename := filepath.Join(nodes[2].genesisDirs.RootGenesisDir, wallets[2])
access, err := db.MakeAccessor(filename, false, false)
require.NoError(t, err)
root, err := account.RestoreRoot(access)
access.Close()
require.NoError(t, err)

addr2 := root.Address()
secrets2 := root.Secrets()

txn := transactions.Transaction{
Type: protocol.PaymentTx,
Header: transactions.Header{
Sender: addr2,
FirstValid: 1,
LastValid: 100,
Fee: basics.MicroAlgos{Raw: 1000},
GenesisID: nodes[2].genesisID,
GenesisHash: nodes[2].genesisHash,
},
PaymentTxnFields: transactions.PaymentTxnFields{
Receiver: addr2,
Amount: basics.MicroAlgos{Raw: 0},
},
}
signature := secrets2.Sign(txn)
stxn := transactions.SignedTxn{
Sig: signature,
Txn: txn,
}

err = nodes[2].BroadcastSignedTxGroup([]transactions.SignedTxn{stxn})
require.NoError(t, err)

initialRound := nodes[0].ledger.NextRound()
targetRound := initialRound + 10

// ensure discovery of archival node by tracking its ledger
select {
case <-nodes[2].ledger.Wait(targetRound):
b, err := nodes[0].ledger.Block(targetRound)
require.NoError(t, err)
require.Greater(t, b.TxnCounter, uint64(1000)) // new initial value after AppForbidLowResources
case <-time.After(3 * time.Minute): // set it to 1.5x of the dht.periodicBootstrapInterval to give DHT code to rebuild routing table one more time
require.Fail(t, fmt.Sprintf("no block notification for wallet: %v.", wallets[0]))
}
}

0 comments on commit ef7c93b

Please sign in to comment.