diff --git a/config/config.go b/config/config.go index 87440cc58a..65d711cacc 100644 --- a/config/config.go +++ b/config/config.go @@ -23,6 +23,7 @@ import ( "os" "os/user" "path/filepath" + "strings" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/codecs" @@ -147,7 +148,17 @@ func mergeConfigFromFile(configpath string, source Local) (Local, error) { defer f.Close() err = loadConfig(f, &source) + if err != nil { + return source, err + } + source, err = enrichNetworkingConfig(source) + return source, err +} +// enrichNetworkingConfig makes the following tweaks to the config: +// - If NetAddress is set, enable the ledger and block services +// - If EnableP2PHybridMode is set, require PublicAddress to be set +func enrichNetworkingConfig(source Local) (Local, error) { // If the PublicAddress in config file has the PlaceholderPublicAddress, treat it as if it were empty if source.PublicAddress == PlaceholderPublicAddress { source.PublicAddress = "" @@ -163,8 +174,13 @@ func mergeConfigFromFile(configpath string, source Local) (Local, error) { source.GossipFanout = defaultRelayGossipFanout } } - - return source, err + // In hybrid mode we want to prevent connections from the same node over both P2P and WS. + // The only way it is supported at the moment is to use net identity challenge that is based on PublicAddress. + if (source.NetAddress != "" || source.P2PNetAddress != "") && source.EnableP2PHybridMode && source.PublicAddress == "" { + return source, errors.New("PublicAddress must be specified when EnableP2PHybridMode is set") + } + source.PublicAddress = strings.ToLower(source.PublicAddress) + return source, nil } func loadConfig(reader io.Reader, config *Local) error { diff --git a/config/config_test.go b/config/config_test.go index 432c0f9281..20338766c3 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -121,6 +121,62 @@ func TestLocal_MergeConfig(t *testing.T) { require.Equal(t, c1.GossipFanout, c2.GossipFanout) } +func TestLocal_EnrichNetworkingConfig(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + c1 := Local{ + NetAddress: "test1", + GossipFanout: defaultLocal.GossipFanout, + } + c2, err := enrichNetworkingConfig(c1) + require.NoError(t, err) + require.NotEqual(t, c1, c2) + require.False(t, c1.EnableLedgerService) + require.False(t, c1.EnableBlockService) + require.Equal(t, c1.GossipFanout, defaultLocal.GossipFanout) + require.True(t, c2.EnableLedgerService) + require.True(t, c2.EnableBlockService) + require.Equal(t, c2.GossipFanout, defaultRelayGossipFanout) + + c1 = Local{ + EnableP2PHybridMode: true, + } + c2, err = enrichNetworkingConfig(c1) + require.NoError(t, err) + + c1 = Local{ + NetAddress: "test1", + EnableP2PHybridMode: true, + } + c2, err = enrichNetworkingConfig(c1) + require.ErrorContains(t, err, "PublicAddress must be specified when EnableP2PHybridMode is set") + + c1 = Local{ + P2PNetAddress: "test1", + EnableP2PHybridMode: true, + } + c2, err = enrichNetworkingConfig(c1) + require.ErrorContains(t, err, "PublicAddress must be specified when EnableP2PHybridMode is set") + + c1 = Local{ + EnableP2PHybridMode: true, + PublicAddress: "test2", + } + c2, err = enrichNetworkingConfig(c1) + require.NoError(t, err) + require.Equal(t, c1, c2) + require.True(t, c2.EnableP2PHybridMode) + require.NotEmpty(t, c2.PublicAddress) + + c1 = Local{ + PublicAddress: "R1.test3.my-domain.tld", + } + c2, err = enrichNetworkingConfig(c1) + require.NoError(t, err) + require.Equal(t, "r1.test3.my-domain.tld", c2.PublicAddress) +} + func saveFullPhonebook(phonebook phonebookBlackWhiteList, saveToDir string) error { filename := filepath.Join(saveToDir, PhonebookFilename) f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) @@ -559,9 +615,68 @@ func TestLocal_IsGossipServer(t *testing.T) { cfg := GetDefaultLocal() require.False(t, cfg.IsGossipServer()) + require.False(t, cfg.IsWsGossipServer()) + require.False(t, cfg.IsP2PGossipServer()) cfg.NetAddress = ":4160" require.True(t, cfg.IsGossipServer()) + require.True(t, cfg.IsWsGossipServer()) + require.False(t, cfg.IsP2PGossipServer()) + + cfg.EnableGossipService = false + // EnableGossipService does not matter + require.True(t, cfg.IsGossipServer()) + require.True(t, cfg.IsWsGossipServer()) + require.False(t, cfg.IsP2PGossipServer()) + + cfg.EnableP2P = true + cfg.NetAddress = ":4160" + require.True(t, cfg.IsGossipServer()) + require.False(t, cfg.IsWsGossipServer()) + require.True(t, cfg.IsP2PGossipServer()) + + cfg.EnableP2P = false + + cfg.EnableP2PHybridMode = true + // with net address set it is ws net gossip server + require.True(t, cfg.IsGossipServer()) + require.True(t, cfg.IsWsGossipServer()) + require.False(t, cfg.IsP2PGossipServer()) + + cfg.EnableP2PHybridMode = true + cfg.NetAddress = "" + require.False(t, cfg.IsGossipServer()) + require.False(t, cfg.IsWsGossipServer()) + require.False(t, cfg.IsP2PGossipServer()) + + cfg.EnableP2PHybridMode = true + cfg.P2PNetAddress = ":4190" + require.True(t, cfg.IsGossipServer()) + require.False(t, cfg.IsWsGossipServer()) + require.True(t, cfg.IsP2PGossipServer()) + + cfg.EnableP2PHybridMode = true + cfg.NetAddress = ":4160" + cfg.P2PNetAddress = ":4190" + require.True(t, cfg.IsGossipServer()) + require.True(t, cfg.IsWsGossipServer()) + require.True(t, cfg.IsP2PGossipServer()) + + cfg.EnableP2PHybridMode = true + cfg.EnableP2P = true + cfg.NetAddress = ":4160" + cfg.P2PNetAddress = ":4190" + require.True(t, cfg.IsGossipServer()) + require.True(t, cfg.IsWsGossipServer()) + require.True(t, cfg.IsP2PGossipServer()) + + cfg.EnableP2PHybridMode = true + cfg.EnableP2P = true + cfg.NetAddress = ":4160" + cfg.P2PNetAddress = "" + require.True(t, cfg.IsGossipServer()) + require.True(t, cfg.IsWsGossipServer()) + require.False(t, cfg.IsP2PGossipServer()) } func TestLocal_RecalculateConnectionLimits(t *testing.T) { @@ -569,45 +684,59 @@ func TestLocal_RecalculateConnectionLimits(t *testing.T) { t.Parallel() var tests = []struct { - maxFDs uint64 - reservedIn uint64 - restSoftIn uint64 - restHardIn uint64 - incomingIn int - - updated bool - restSoftExp uint64 - restHardExp uint64 - incomingExp int + maxFDs uint64 + reservedIn uint64 + restSoftIn uint64 + restHardIn uint64 + incomingIn int + p2pIncomingIn int + + updated bool + restSoftExp uint64 + restHardExp uint64 + incomingExp int + p2pIncomingExp int }{ - {100, 10, 20, 40, 50, false, 20, 40, 50}, // no change - {100, 10, 20, 50, 50, true, 20, 40, 50}, // borrow from rest - {100, 10, 25, 50, 50, true, 25, 40, 50}, // borrow from rest - {100, 10, 50, 50, 50, true, 40, 40, 50}, // borrow from rest, update soft - {100, 10, 9, 19, 81, true, 9, 10, 80}, // borrow from both rest and incoming - {100, 10, 10, 20, 80, true, 10, 10, 80}, // borrow from both rest and incoming - {100, 50, 10, 30, 40, true, 10, 10, 40}, // borrow from both rest and incoming - {100, 90, 10, 30, 40, true, 10, 10, 0}, // borrow from both rest and incoming, clear incoming - {4096, 256, 1024, 2048, 2400, true, 1024, 1440, 2400}, // real numbers - {5000, 256, 1024, 2048, 2400, false, 1024, 2048, 2400}, // real numbers + {100, 10, 20, 40, 50, 0, false, 20, 40, 50, 0}, // no change + {100, 10, 20, 50, 50, 0, true, 20, 40, 50, 0}, // borrow from rest + {100, 10, 25, 50, 50, 0, true, 25, 40, 50, 0}, // borrow from rest + {100, 10, 25, 50, 50, 50, true, 10, 10, 40, 40}, // borrow from rest for incoming and p2p incoming + {100, 10, 50, 50, 50, 0, true, 40, 40, 50, 0}, // borrow from rest, update soft + {100, 10, 50, 50, 40, 10, true, 40, 40, 40, 10}, // borrow from rest, update soft for incoming and p2p incoming + {100, 10, 9, 19, 81, 0, true, 9, 10, 80, 0}, // borrow from both rest and incoming + {100, 10, 9, 19, 41, 41, true, 9, 10, 40, 40}, // borrow from both rest and incoming for incoming and p2p incoming + {100, 90, 10, 30, 40, 0, true, 10, 10, 0, 0}, // borrow from both rest and incoming, clear incoming + {100, 90, 10, 30, 40, 40, true, 10, 10, 0, 0}, // borrow from both rest and incoming, clear incoming + {100, 90, 10, 30, 50, 40, true, 10, 10, 0, 0}, // borrow from both rest and incoming, clear incoming + {4096, 256, 1024, 2048, 2400, 0, true, 1024, 1440, 2400, 0}, // real numbers + {5000, 256, 1024, 2048, 2400, 0, false, 1024, 2048, 2400, 0}, // real numbers + {4096, 256, 1024, 2048, 2400, 1200, true, 240, 240, 2400, 1200}, // real numbers + {6000, 256, 1024, 2048, 2400, 1200, false, 1024, 2048, 2400, 1200}, // real numbers } for i, test := range tests { test := test - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + t.Run(fmt.Sprintf("test=%d", i), func(t *testing.T) { t.Parallel() c := Local{ - RestConnectionsSoftLimit: test.restSoftIn, - RestConnectionsHardLimit: test.restHardIn, - IncomingConnectionsLimit: test.incomingIn, + NetAddress: ":4160", + RestConnectionsSoftLimit: test.restSoftIn, + RestConnectionsHardLimit: test.restHardIn, + IncomingConnectionsLimit: test.incomingIn, + P2PIncomingConnectionsLimit: test.p2pIncomingIn, + } + if test.p2pIncomingIn > 0 { + c.EnableP2PHybridMode = true + c.P2PNetAddress = ":4190" } - requireFDs := test.reservedIn + test.restHardIn + uint64(test.incomingIn) + requireFDs := test.reservedIn + test.restHardIn + uint64(test.incomingIn) + uint64(test.p2pIncomingIn) res := c.AdjustConnectionLimits(requireFDs, test.maxFDs) require.Equal(t, test.updated, res) - require.Equal(t, test.restSoftExp, c.RestConnectionsSoftLimit) - require.Equal(t, test.restHardExp, c.RestConnectionsHardLimit) - require.Equal(t, test.incomingExp, c.IncomingConnectionsLimit) + require.Equal(t, int(test.restSoftExp), int(c.RestConnectionsSoftLimit)) + require.Equal(t, int(test.restHardExp), int(c.RestConnectionsHardLimit)) + require.Equal(t, int(test.incomingExp), int(c.IncomingConnectionsLimit)) + require.Equal(t, int(test.p2pIncomingExp), int(c.P2PIncomingConnectionsLimit)) }) } } diff --git a/config/localTemplate.go b/config/localTemplate.go index 314b83a78b..9583a194cd 100644 --- a/config/localTemplate.go +++ b/config/localTemplate.go @@ -134,6 +134,8 @@ type Local struct { // Estimating 1.5MB per incoming connection, 1.5MB*2400 = 3.6GB IncomingConnectionsLimit int `version[0]:"-1" version[1]:"10000" version[17]:"800" version[27]:"2400"` + P2PIncomingConnectionsLimit int `version[34]:"1200"` + // BroadcastConnectionsLimit specifies the number of connections that // will receive broadcast (gossip) messages from this node. If the // node has more connections than this number, it will send broadcasts @@ -602,6 +604,7 @@ type Local struct { EnableP2P bool `version[31]:"false"` // EnableP2PHybridMode turns on both websockets and P2P networking. + // Enabling this setting also requires PublicAddress to be set. EnableP2PHybridMode bool `version[34]:"false"` // P2PNetAddress sets the listen address used for P2P networking, if hybrid mode is set. @@ -734,10 +737,21 @@ func (cfg Local) TxFilterCanonicalEnabled() bool { return cfg.TxIncomingFilteringFlags&txFilterCanonical != 0 } -// IsGossipServer returns true if NetAddress is set and this node supposed -// to start websocket server +// IsGossipServer returns true if this node supposed to start websocket or p2p server func (cfg Local) IsGossipServer() bool { - return cfg.NetAddress != "" + return cfg.IsWsGossipServer() || cfg.IsP2PGossipServer() +} + +// IsWsGossipServer returns true if a node configured to run a listening ws net +func (cfg Local) IsWsGossipServer() bool { + // 1. NetAddress is set and EnableP2P is not set + // 2. NetAddress is set and EnableP2PHybridMode is set then EnableP2P is overridden by EnableP2PHybridMode + return cfg.NetAddress != "" && (!cfg.EnableP2P || cfg.EnableP2PHybridMode) +} + +// IsP2PGossipServer returns true if a node configured to run a listening p2p net +func (cfg Local) IsP2PGossipServer() bool { + return (cfg.EnableP2P && !cfg.EnableP2PHybridMode && cfg.NetAddress != "") || (cfg.EnableP2PHybridMode && cfg.P2PNetAddress != "") } // ensureAbsGenesisDir will convert a path to absolute, and will attempt to make a genesis directory there @@ -935,10 +949,24 @@ func (cfg *Local) AdjustConnectionLimits(requiredFDs, maxFDs uint64) bool { if cfg.RestConnectionsHardLimit <= diff+reservedRESTConns { restDelta := diff + reservedRESTConns - cfg.RestConnectionsHardLimit cfg.RestConnectionsHardLimit = reservedRESTConns - if cfg.IncomingConnectionsLimit > int(restDelta) { - cfg.IncomingConnectionsLimit -= int(restDelta) - } else { - cfg.IncomingConnectionsLimit = 0 + splitRatio := 1 + if cfg.IsWsGossipServer() && cfg.IsP2PGossipServer() { + // split the rest of the delta between ws and p2p evenly + splitRatio = 2 + } + if cfg.IsWsGossipServer() { + if cfg.IncomingConnectionsLimit > int(restDelta) { + cfg.IncomingConnectionsLimit -= int(restDelta) / splitRatio + } else { + cfg.IncomingConnectionsLimit = 0 + } + } + if cfg.IsP2PGossipServer() { + if cfg.P2PIncomingConnectionsLimit > int(restDelta) { + cfg.P2PIncomingConnectionsLimit -= int(restDelta) / splitRatio + } else { + cfg.P2PIncomingConnectionsLimit = 0 + } } } else { cfg.RestConnectionsHardLimit -= diff diff --git a/config/local_defaults.go b/config/local_defaults.go index ae2ed22ebf..57457531be 100644 --- a/config/local_defaults.go +++ b/config/local_defaults.go @@ -119,6 +119,7 @@ var defaultLocal = Local{ OptimizeAccountsDatabaseOnStartup: false, OutgoingMessageFilterBucketCount: 3, OutgoingMessageFilterBucketSize: 128, + P2PIncomingConnectionsLimit: 1200, P2PNetAddress: "", P2PPersistPeerID: false, P2PPrivateKeyLocation: "", diff --git a/daemon/algod/server.go b/daemon/algod/server.go index c43b0b0693..309fdc5799 100644 --- a/daemon/algod/server.go +++ b/daemon/algod/server.go @@ -149,9 +149,21 @@ func (s *Server) Initialize(cfg config.Local, phonebookAddresses []string, genes if cfg.IsGossipServer() { var ot basics.OverflowTracker - fdRequired = ot.Add(fdRequired, uint64(cfg.IncomingConnectionsLimit)+network.ReservedHealthServiceConnections) + fdRequired = ot.Add(fdRequired, network.ReservedHealthServiceConnections) if ot.Overflowed { - return errors.New("Initialize() overflowed when adding up IncomingConnectionsLimit to the existing RLIMIT_NOFILE value; decrease RestConnectionsHardLimit or IncomingConnectionsLimit") + return errors.New("Initialize() overflowed when adding up ReservedHealthServiceConnections to the existing RLIMIT_NOFILE value; decrease RestConnectionsHardLimit") + } + if cfg.IsWsGossipServer() { + fdRequired = ot.Add(fdRequired, uint64(cfg.IncomingConnectionsLimit)) + if ot.Overflowed { + return errors.New("Initialize() overflowed when adding up IncomingConnectionsLimit to the existing RLIMIT_NOFILE value; decrease IncomingConnectionsLimit") + } + } + if cfg.IsP2PGossipServer() { + fdRequired = ot.Add(fdRequired, uint64(cfg.P2PIncomingConnectionsLimit)) + if ot.Overflowed { + return errors.New("Initialize() overflowed when adding up P2PIncomingConnectionsLimit to the existing RLIMIT_NOFILE value; decrease P2PIncomingConnectionsLimit") + } } _, hard, fdErr := util.GetFdLimits() if fdErr != nil { @@ -164,14 +176,18 @@ func (s *Server) Initialize(cfg config.Local, phonebookAddresses []string, genes // but try to keep cfg.ReservedFDs untouched by decreasing other limits if cfg.AdjustConnectionLimits(fdRequired, hard) { s.log.Warnf( - "Updated connection limits: RestConnectionsSoftLimit=%d, RestConnectionsHardLimit=%d, IncomingConnectionsLimit=%d", + "Updated connection limits: RestConnectionsSoftLimit=%d, RestConnectionsHardLimit=%d, IncomingConnectionsLimit=%d, P2PIncomingConnectionsLimit=%d", cfg.RestConnectionsSoftLimit, cfg.RestConnectionsHardLimit, cfg.IncomingConnectionsLimit, + cfg.P2PIncomingConnectionsLimit, ) - if cfg.IncomingConnectionsLimit == 0 { + if cfg.IsWsGossipServer() && cfg.IncomingConnectionsLimit == 0 { return errors.New("Initialize() failed to adjust connection limits") } + if cfg.IsP2PGossipServer() && cfg.P2PIncomingConnectionsLimit == 0 { + return errors.New("Initialize() failed to adjust p2p connection limits") + } } } fdErr = util.SetFdSoftLimit(maxFDs) diff --git a/installer/config.json.example b/installer/config.json.example index 7f16155303..3a9714bbfb 100644 --- a/installer/config.json.example +++ b/installer/config.json.example @@ -98,6 +98,7 @@ "OptimizeAccountsDatabaseOnStartup": false, "OutgoingMessageFilterBucketCount": 3, "OutgoingMessageFilterBucketSize": 128, + "P2PIncomingConnectionsLimit": 1200, "P2PNetAddress": "", "P2PPersistPeerID": false, "P2PPrivateKeyLocation": "", diff --git a/netdeploy/remote/deployedNetwork.go b/netdeploy/remote/deployedNetwork.go index 25de422026..ce72071ff0 100644 --- a/netdeploy/remote/deployedNetwork.go +++ b/netdeploy/remote/deployedNetwork.go @@ -1005,6 +1005,16 @@ func createHostSpec(host HostConfig, template cloudHost) (hostSpec cloudHostSpec portList = append(portList, strconv.Itoa(port)) } } + if node.P2PNetAddress != "" { + port, err = extractPublicPort(node.P2PNetAddress) + if err != nil { + return + } + if !ports[port] { + ports[port] = true + portList = append(portList, strconv.Itoa(port)) + } + } // See if the APIEndpoint is open to the public, and if so add it // Error means it's not valid/specified as public port diff --git a/netdeploy/remote/nodeConfig.go b/netdeploy/remote/nodeConfig.go index 4880d76eb9..bd4b63dac8 100644 --- a/netdeploy/remote/nodeConfig.go +++ b/netdeploy/remote/nodeConfig.go @@ -35,6 +35,8 @@ type NodeConfig struct { DeadlockOverride int `json:",omitempty"` // -1 = Disable deadlock detection, 0 = Use Default for build, 1 = Enable ConfigJSONOverride string `json:",omitempty"` // Raw json to merge into config.json after other modifications are complete P2PBootstrap bool // True if this node should be a p2p bootstrap node and registered in DNS + P2PNetAddress string `json:",omitempty"` + PublicAddress bool // NodeNameMatchRegex is tested against Name in generated configs and if matched the rest of the configs in this record are applied as a template NodeNameMatchRegex string `json:",omitempty"` diff --git a/netdeploy/remote/nodecfg/nodeConfigurator.go b/netdeploy/remote/nodecfg/nodeConfigurator.go index 842570bfc8..8e6bea9718 100644 --- a/netdeploy/remote/nodecfg/nodeConfigurator.go +++ b/netdeploy/remote/nodecfg/nodeConfigurator.go @@ -93,6 +93,10 @@ func (nc *nodeConfigurator) apply(rootConfigDir, rootNodeDir string) (err error) nc.genesisFile = filepath.Join(rootConfigDir, "genesisdata", config.GenesisJSONFile) nc.genesisData, err = bookkeeping.LoadGenesisFromFile(nc.genesisFile) + if err != nil { + return fmt.Errorf("error loading genesis from '%s': %v", nc.genesisFile, err) + + } nodeDirs, err := nc.prepareNodeDirs(nc.config.Nodes, rootConfigDir, rootNodeDir) if err != nil { return fmt.Errorf("error preparing node directories: %v", err) @@ -198,6 +202,11 @@ func (nc *nodeConfigurator) prepareNodeDirs(configs []remote.NodeConfig, rootCon return } +// getHostName creates a DNS name for a host +func (nc *nodeConfigurator) getNetworkHostName() string { + return nc.config.Name + "." + string(nc.genesisData.Network) + ".algodev.network" +} + func (nc *nodeConfigurator) registerDNSRecords() (err error) { cfZoneID, cfToken, err := getClouldflareCredentials() if err != nil { @@ -210,12 +219,13 @@ func (nc *nodeConfigurator) registerDNSRecords() (err error) { const weight = 1 const relayBootstrap = "_algobootstrap" const metricsSrv = "_metrics" + const tcpProto = "_tcp" const proxied = false // If we need to register anything, first register a DNS entry // to map our network DNS name to our public name (or IP) provided to nodecfg // Network HostName = eg r1.testnet.algodev.network - networkHostName := nc.config.Name + "." + string(nc.genesisData.Network) + ".algodev.network" + networkHostName := nc.getNetworkHostName() isIP := net.ParseIP(nc.dnsName) != nil var recordType string if isIP { @@ -232,9 +242,10 @@ func (nc *nodeConfigurator) registerDNSRecords() (err error) { if parseErr != nil { return parseErr } - fmt.Fprintf(os.Stdout, "...... Adding Relay SRV Record '%s' -> '%s' .\n", entry.srvName, networkHostName) + fmt.Fprintf(os.Stdout, "...... Adding Relay SRV Record [%s.%s] '%s' [%d %d] -> '%s' .\n", + relayBootstrap, tcpProto, entry.srvName, priority, port, networkHostName) err = cloudflareDNS.SetSRVRecord(context.Background(), entry.srvName, networkHostName, - cloudflare.AutomaticTTL, priority, uint(port), relayBootstrap, "_tcp", weight) + cloudflare.AutomaticTTL, priority, uint(port), relayBootstrap, tcpProto, weight) if err != nil { return } @@ -246,9 +257,10 @@ func (nc *nodeConfigurator) registerDNSRecords() (err error) { fmt.Fprintf(os.Stdout, "Error parsing port for srv record: %s (port %v)\n", parseErr, entry) return parseErr } - fmt.Fprintf(os.Stdout, "...... Adding Metrics SRV Record '%s' -> '%s' .\n", entry.srvName, networkHostName) + fmt.Fprintf(os.Stdout, "...... Adding Metrics SRV Record [%s.%s] '%s' [%d %d] -> '%s' .\n", + metricsSrv, tcpProto, entry.srvName, priority, port, networkHostName) err = cloudflareDNS.SetSRVRecord(context.Background(), entry.srvName, networkHostName, - cloudflare.AutomaticTTL, priority, uint(port), metricsSrv, "_tcp", weight) + cloudflare.AutomaticTTL, priority, uint(port), metricsSrv, tcpProto, weight) if err != nil { fmt.Fprintf(os.Stdout, "Error creating srv record: %s (%v)\n", err, entry) return diff --git a/netdeploy/remote/nodecfg/nodeDir.go b/netdeploy/remote/nodecfg/nodeDir.go index bdfc037438..304fa4c636 100644 --- a/netdeploy/remote/nodecfg/nodeDir.go +++ b/netdeploy/remote/nodecfg/nodeDir.go @@ -104,6 +104,11 @@ func (nd *nodeDir) configure() (err error) { return } + if err = nd.configurePublicAddress(nd.PublicAddress); err != nil { + fmt.Fprintf(os.Stdout, "Error during configurePublicAddress: %s\n", err) + return + } + if err = nd.configureP2PDNSBootstrap(nd.P2PBootstrap); err != nil { fmt.Fprintf(os.Stdout, "Error during configureP2PDNSBootstrap: %s\n", err) return @@ -155,15 +160,46 @@ func (nd *nodeDir) configureNetAddress() (err error) { fmt.Fprintf(os.Stdout, " - Assigning NetAddress: %s\n", nd.NetAddress) nd.config.NetAddress = nd.NetAddress if nd.IsRelay() && nd.NetAddress[0] == ':' { - fmt.Fprintf(os.Stdout, " - adding to relay addresses\n") - for _, bootstrapRecord := range nd.config.DNSBootstrapArray(nd.configurator.genesisData.Network) { - nd.configurator.addRelaySrv(bootstrapRecord.PrimarySRVBootstrap, nd.NetAddress) + if nd.config.EnableP2P && !nd.config.EnableP2PHybridMode { + fmt.Fprintf(os.Stdout, " - skipping relay addresses - p2p mode\n") + } else { + fmt.Fprintf(os.Stdout, " - adding to relay addresses\n") + for _, bootstrapRecord := range nd.config.DNSBootstrapArray(nd.configurator.genesisData.Network) { + nd.configurator.addRelaySrv(bootstrapRecord.PrimarySRVBootstrap, nd.NetAddress) + } } } + if nd.P2PNetAddress != "" { + fmt.Fprintf(os.Stdout, " - Assigning P2PNetAddress: %s\n", nd.P2PNetAddress) + nd.config.P2PNetAddress = nd.P2PNetAddress + } err = nd.saveConfig() return } +func (nd *nodeDir) configurePublicAddress(publicAddress bool) error { + if !publicAddress { + return nil + } + if !nd.IsRelay() { + return errors.New("publicAddress is only valid for relay nodes") + } + if nd.config.EnableP2P && !nd.config.EnableP2PHybridMode { + return errors.New("publicAddress is only valid websocket gossip node or a hybrid mode node") + } + + if err := nd.ensureConfig(); err != nil { + return err + } + + if nd.NetAddress[0] == ':' { + networkHostName := nd.configurator.getNetworkHostName() + nd.NetAddress + nd.config.PublicAddress = strings.ToLower(networkHostName) + fmt.Fprintf(os.Stdout, " - Assigning PublicAddress: %s\n", networkHostName) + } + return nd.saveConfig() +} + func (nd *nodeDir) configureP2PDNSBootstrap(p2pBootstrap bool) error { if !p2pBootstrap { return nil @@ -179,7 +215,7 @@ func (nd *nodeDir) configureP2PDNSBootstrap(p2pBootstrap bool) error { if !nd.config.EnableP2P && !nd.config.EnableP2PHybridMode { return errors.New("p2p bootstrap requires EnableP2P or EnableP2PHybridMode to be set") } - if nd.NetAddress == "" && nd.config.P2PNetAddress == "" { + if nd.NetAddress == "" && nd.P2PNetAddress == "" { return errors.New("p2p bootstrap requires NetAddress or P2PNetAddress to be set") } if !nd.config.EnableGossipService { @@ -187,8 +223,8 @@ func (nd *nodeDir) configureP2PDNSBootstrap(p2pBootstrap bool) error { } netAddress := nd.NetAddress - if nd.config.P2PNetAddress != "" { - netAddress = nd.config.P2PNetAddress + if nd.P2PNetAddress != "" { + netAddress = nd.P2PNetAddress } key, err := p2p.GetPrivKey(config.Local{P2PPersistPeerID: true}, nd.dataDir) diff --git a/network/hybridNetwork.go b/network/hybridNetwork.go index 27fc6edbb0..d30e03cee2 100644 --- a/network/hybridNetwork.go +++ b/network/hybridNetwork.go @@ -42,11 +42,17 @@ func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, p // supply alternate NetAddress for P2P network p2pcfg := cfg p2pcfg.NetAddress = cfg.P2PNetAddress - p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisID, networkID, nodeInfo) + identityTracker := NewIdentityTracker() + p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisID, networkID, nodeInfo, &identityOpts{tracker: identityTracker}) if err != nil { return nil, err } - wsnet, err := NewWebsocketNetwork(log, cfg, phonebookAddresses, genesisID, networkID, nodeInfo, p2pnet.PeerID(), p2pnet.PeerIDSigner()) + + identOpts := identityOpts{ + tracker: identityTracker, + scheme: NewIdentityChallengeScheme(NetIdentityDedupNames(cfg.PublicAddress, p2pnet.PeerID().String()), NetIdentitySigner(p2pnet.PeerIDSigner())), + } + wsnet, err := NewWebsocketNetwork(log, cfg, phonebookAddresses, genesisID, networkID, nodeInfo, &identOpts) if err != nil { return nil, err } diff --git a/network/hybridNetwork_test.go b/network/hybridNetwork_test.go new file mode 100644 index 0000000000..7c76c1e38e --- /dev/null +++ b/network/hybridNetwork_test.go @@ -0,0 +1,183 @@ +// Copyright (C) 2019-2024 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package network + +import ( + "net/url" + "testing" + "time" + + "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/test/partitiontest" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" +) + +// TestHybridNetwork_DuplicateConn checks the same nodes do not connect over ws and p2p. +// Scenario: +// 1. Create a hybrid network: relay and two nodes +// 2. Let them connect to the relay +// 3. Ensure relay has only two connections +// 4. Ensure extra connection attempts were rejected by nodes rather than relay +func TestHybridNetwork_DuplicateConn(t *testing.T) { + partitiontest.PartitionTest(t) + + cfg := config.GetDefaultLocal() + cfg.EnableP2PHybridMode = true + log := logging.TestingLog(t) + const p2pKeyDir = "" + + identDiscValue := networkPeerIdentityDisconnect.GetUint64Value() + + relayCfg := cfg + relayCfg.ForceRelayMessages = true + netA, err := NewHybridP2PNetwork(log.With("node", "netA"), relayCfg, p2pKeyDir, nil, genesisID, "net", &nopeNodeInfo{}) + require.NoError(t, err) + + err = netA.Start() + require.NoError(t, err) + + // collect ws address + addr, portListen := netA.wsNetwork.Address() + require.True(t, portListen) + require.NotZero(t, addr) + parsed, err := url.Parse(addr) + require.NoError(t, err) + addr = parsed.Host + netA.Stop() + + // make it net address and restart the node + relayCfg.NetAddress = addr + relayCfg.PublicAddress = addr + relayCfg.P2PNetAddress = ":0" + netA, err = NewHybridP2PNetwork(log.With("node", "netA"), relayCfg, p2pKeyDir, nil, genesisID, "net", &nopeNodeInfo{}) + require.NoError(t, err) + + err = netA.Start() + require.NoError(t, err) + defer netA.Stop() + + // collect relay address and prepare nodes phonebook + peerInfoA := netA.p2pNetwork.service.AddrInfo() + addrsAp2p, err := peer.AddrInfoToP2pAddrs(&peerInfoA) + require.NoError(t, err) + require.NotZero(t, addrsAp2p[0]) + multiAddrStr := addrsAp2p[0].String() + + fullAddr, portListen := netA.wsNetwork.Address() + require.True(t, portListen) + require.NotZero(t, addr) + require.Contains(t, fullAddr, addr) + + phoneBookAddresses := []string{multiAddrStr, addr} + + netB, err := NewHybridP2PNetwork(log.With("node", "netB"), cfg, "", phoneBookAddresses, genesisID, "net", &nopeNodeInfo{}) + require.NoError(t, err) + // for netB start the p2p network first + err = netB.p2pNetwork.Start() + require.NoError(t, err) + defer netB.Stop() + + netC, err := NewHybridP2PNetwork(log.With("node", "netC"), cfg, "", phoneBookAddresses, genesisID, "net", &nopeNodeInfo{}) + require.NoError(t, err) + // for netC start the ws network first + err = netC.wsNetwork.Start() + require.NoError(t, err) + defer netC.Stop() + + // ensure initial connections are done + require.Eventually(t, func() bool { + return len(netA.GetPeers(PeersConnectedIn)) == 2 + }, 3*time.Second, 50*time.Millisecond) + + // start the second half of the hybrid net + err = netB.wsNetwork.Start() + require.NoError(t, err) + err = netC.p2pNetwork.Start() + require.NoError(t, err) + + // wait for connection attempts. nodes need some time to make connections, + // and instead of `time.Sleep(1 * time.Second)` the networkPeerIdentityDisconnect net identity counter is used. + // Since this test is not parallel the networkPeerIdentityDisconnect should not be modified from outside. + // Both netB and netC are attempting to connect but netA could also open an outgoing stream in netB or netC connection. + // So, the counter should be at least 2+identDiscValue. + const waitFor = 3 * time.Second + const checkEvery = 50 * time.Millisecond + const maxTicks = int(waitFor / checkEvery) + const debugThreshold = maxTicks - maxTicks/20 // log last 5% of ticks + require.Greater(t, debugThreshold, 1) + require.Less(t, debugThreshold, maxTicks) + tickCounter := 0 + require.Eventually(t, func() bool { + if tickCounter >= debugThreshold { + log.Infof("networkPeerIdentityDisconnect: %d\n", networkPeerIdentityDisconnect.GetUint64Value()) + } + tickCounter++ + return networkPeerIdentityDisconnect.GetUint64Value() >= 2+identDiscValue + }, waitFor, checkEvery) + + // now count connections + // netA should have 2 connections, not 4 + // netB should have 1 connection (via p2p) + // netC should have 1 connection (via ws) + + tickCounter = 0 + require.Eventually(t, func() bool { + if tickCounter >= debugThreshold { + netAIn := len(netA.GetPeers(PeersConnectedIn)) + netAOut := len(netA.GetPeers(PeersConnectedOut)) + netBIn := len(netB.GetPeers(PeersConnectedIn)) + netBOut := len(netB.GetPeers(PeersConnectedOut)) + netCIn := len(netC.GetPeers(PeersConnectedIn)) + netCOut := len(netC.GetPeers(PeersConnectedOut)) + log.Infof("netA in/out: %d/%d, netB in/out: %d/%d, netC in/out: %d/%d\n", netAIn, netAOut, netBIn, netBOut, netCIn, netCOut) + } + tickCounter++ + return len(netB.GetPeers(PeersConnectedOut)) == 1 + }, waitFor, checkEvery) + + tickCounter = 0 + require.Eventually(t, func() bool { + if tickCounter >= debugThreshold { + netAIn := len(netA.GetPeers(PeersConnectedIn)) + netAOut := len(netA.GetPeers(PeersConnectedOut)) + netBIn := len(netB.GetPeers(PeersConnectedIn)) + netBOut := len(netB.GetPeers(PeersConnectedOut)) + netCIn := len(netC.GetPeers(PeersConnectedIn)) + netCOut := len(netC.GetPeers(PeersConnectedOut)) + log.Infof("netA in/out: %d/%d, netB in/out: %d/%d, netC in/out: %d/%d\n", netAIn, netAOut, netBIn, netBOut, netCIn, netCOut) + } + tickCounter++ + return len(netC.GetPeers(PeersConnectedOut)) == 1 + }, waitFor, checkEvery) + + tickCounter = 0 + require.Eventually(t, func() bool { + if tickCounter >= debugThreshold { + netAIn := len(netA.GetPeers(PeersConnectedIn)) + netAOut := len(netA.GetPeers(PeersConnectedOut)) + netBIn := len(netB.GetPeers(PeersConnectedIn)) + netBOut := len(netB.GetPeers(PeersConnectedOut)) + netCIn := len(netC.GetPeers(PeersConnectedIn)) + netCOut := len(netC.GetPeers(PeersConnectedOut)) + log.Infof("netA in/out: %d/%d, netB in/out: %d/%d, netC in/out: %d/%d\n", netAIn, netAOut, netBIn, netBOut, netCIn, netCOut) + } + tickCounter++ + return len(netA.GetPeers(PeersConnectedIn)) == 2 + }, 3*time.Second, 50*time.Millisecond) +} diff --git a/network/netidentity.go b/network/netidentity.go index 4d797a1a5b..30755f0648 100644 --- a/network/netidentity.go +++ b/network/netidentity.go @@ -100,6 +100,11 @@ type identityChallengeSigner interface { PublicKey() crypto.PublicKey } +type identityOpts struct { + scheme identityChallengeScheme + tracker identityTracker +} + type identityChallengeLegacySigner struct { keys *crypto.SignatureSecrets } @@ -120,37 +125,81 @@ func (s *identityChallengeLegacySigner) PublicKey() crypto.PublicKey { // exchanging and verifying public key challenges and attaching them to headers, // or returning the message payload to be sent type identityChallengePublicKeyScheme struct { - dedupName string + dedupNames map[string]struct{} identityKeys identityChallengeSigner } +type identityChallengeSchemeConfig struct { + dedupNames []string + signer identityChallengeSigner +} + +// IdentityChallengeSchemeOption is a function that can be passed to NewIdentityChallengeScheme +type IdentityChallengeSchemeOption func(*identityChallengeSchemeConfig) + +// NetIdentityDedupNames is an option to set the deduplication names for the identity challenge scheme +func NetIdentityDedupNames(dn ...string) IdentityChallengeSchemeOption { + return func(c *identityChallengeSchemeConfig) { + c.dedupNames = append(c.dedupNames, dn...) + } +} + +// NetIdentitySigner is an option to set the signer for the identity challenge scheme +func NetIdentitySigner(s identityChallengeSigner) IdentityChallengeSchemeOption { + return func(c *identityChallengeSchemeConfig) { + c.signer = s + } +} + // NewIdentityChallengeScheme will create a default Identification Scheme -func NewIdentityChallengeScheme(dn string) *identityChallengePublicKeyScheme { - // without an deduplication name, there is no identityto manage, so just return an empty scheme - if dn == "" { +func NewIdentityChallengeScheme(opts ...IdentityChallengeSchemeOption) *identityChallengePublicKeyScheme { + // without an deduplication name, there is no identity to manage, so just return an empty scheme + if len(opts) == 0 { + return &identityChallengePublicKeyScheme{} + } + + config := identityChallengeSchemeConfig{} + for _, opt := range opts { + opt(&config) + } + + if len(config.dedupNames) == 0 { return &identityChallengePublicKeyScheme{} } + hasNonEmpty := false + dedupNames := make(map[string]struct{}, len(config.dedupNames)) + for _, name := range config.dedupNames { + if len(name) > 0 { + dedupNames[name] = struct{}{} + hasNonEmpty = true + } + } + if !hasNonEmpty { + return &identityChallengePublicKeyScheme{} + } + + if config.signer != nil { + return &identityChallengePublicKeyScheme{ + dedupNames: dedupNames, + identityKeys: config.signer, + } + } + var seed crypto.Seed crypto.RandBytes(seed[:]) - return &identityChallengePublicKeyScheme{ - dedupName: dn, + dedupNames: dedupNames, identityKeys: &identityChallengeLegacySigner{keys: crypto.GenerateSignatureSecrets(seed)}, } } -// NewIdentityChallengeSchemeWithSigner will create an identification Scheme with a given signer -func NewIdentityChallengeSchemeWithSigner(dn string, signer identityChallengeSigner) *identityChallengePublicKeyScheme { - return &identityChallengePublicKeyScheme{dedupName: dn, identityKeys: signer} -} - // AttachChallenge will generate a new identity challenge and will encode and attach the challenge // as a header. It returns the identityChallengeValue used for this challenge, so the network can // confirm it later (by passing it to VerifyResponse), or returns an empty challenge if dedupName is // not set. func (i identityChallengePublicKeyScheme) AttachChallenge(attachTo http.Header, addr string) identityChallengeValue { - if i.dedupName == "" || addr == "" { + if len(i.dedupNames) == 0 || addr == "" { return identityChallengeValue{} } c := identityChallenge{ @@ -172,7 +221,7 @@ func (i identityChallengePublicKeyScheme) AttachChallenge(attachTo http.Header, // or returns empty values if the header did not end up getting set func (i identityChallengePublicKeyScheme) VerifyRequestAndAttachResponse(attachTo http.Header, h http.Header) (identityChallengeValue, crypto.PublicKey, error) { // if dedupName is not set, this scheme is not configured to exchange identity - if i.dedupName == "" { + if len(i.dedupNames) == 0 { return identityChallengeValue{}, crypto.PublicKey{}, nil } // if the headerString is not populated, the peer isn't participating in identity exchange @@ -193,10 +242,11 @@ func (i identityChallengePublicKeyScheme) VerifyRequestAndAttachResponse(attachT if !idChal.Verify() { return identityChallengeValue{}, crypto.PublicKey{}, fmt.Errorf("identity challenge incorrectly signed") } + // if the address is not meant for this host, return without attaching headers, // but also do not emit an error. This is because if an operator were to incorrectly // specify their dedupName, it could result in inappropriate disconnections from valid peers - if string(idChal.Msg.PublicAddress) != i.dedupName { + if _, ok := i.dedupNames[string(idChal.Msg.PublicAddress)]; !ok { return identityChallengeValue{}, crypto.PublicKey{}, nil } // make the response object, encode it and attach it to the header @@ -216,7 +266,7 @@ func (i identityChallengePublicKeyScheme) VerifyRequestAndAttachResponse(attachT // encoded identityVerificationMessage to send to the peer. Otherwise, it returns empty values. func (i identityChallengePublicKeyScheme) VerifyResponse(h http.Header, c identityChallengeValue) (crypto.PublicKey, []byte, error) { // if we are not participating in identity challenge exchange, do nothing (no error and no value) - if i.dedupName == "" { + if len(i.dedupNames) == 0 { return crypto.PublicKey{}, []byte{}, nil } headerString := h.Get(IdentityChallengeHeader) @@ -400,9 +450,16 @@ type identityTracker interface { setIdentity(p *wsPeer) bool } +// noopIdentityTracker implements identityTracker by doing nothing. +// Intended for pure p2p mode when libp2p is handling identities itself. +type noopIdentityTracker struct{} + +func (noopIdentityTracker) setIdentity(p *wsPeer) bool { return true } +func (noopIdentityTracker) removeIdentity(p *wsPeer) {} + // publicKeyIdentTracker implements identityTracker by // mapping from PublicKeys exchanged in identity challenges to a peer -// this structure is not thread-safe; it is protected by wn.peersLock. +// this structure is not thread-safe; it is protected by wn.peersLock or p2p.wsPeersLock type publicKeyIdentTracker struct { peersByID map[crypto.PublicKey]*wsPeer } diff --git a/network/netidentity_test.go b/network/netidentity_test.go index f87480c1b1..a54628a6fe 100644 --- a/network/netidentity_test.go +++ b/network/netidentity_test.go @@ -32,12 +32,15 @@ func TestIdentityChallengeSchemeAttachIfEnabled(t *testing.T) { partitiontest.PartitionTest(t) h := http.Header{} - i := NewIdentityChallengeScheme("") + i0 := NewIdentityChallengeScheme() + i := NewIdentityChallengeScheme(NetIdentityDedupNames("")) + require.Equal(t, i0, i) + require.Zero(t, *i) chal := i.AttachChallenge(h, "other") - require.Empty(t, h.Get(IdentityChallengeHeader)) - require.Empty(t, chal) + require.Zero(t, h.Get(IdentityChallengeHeader)) + require.Zero(t, chal) - j := NewIdentityChallengeScheme("yes") + j := NewIdentityChallengeScheme(NetIdentityDedupNames("yes")) chal = j.AttachChallenge(h, "other") require.NotEmpty(t, h.Get(IdentityChallengeHeader)) require.NotEmpty(t, chal) @@ -48,7 +51,7 @@ func TestIdentityChallengeSchemeAttachIfEnabled(t *testing.T) { func TestIdentityChallengeSchemeVerifyRequestAndAttachResponse(t *testing.T) { partitiontest.PartitionTest(t) - i := NewIdentityChallengeScheme("i1") + i := NewIdentityChallengeScheme(NetIdentityDedupNames("i1")) // author a challenge to the other scheme h := http.Header{} i.AttachChallenge(h, "i2") @@ -58,7 +61,7 @@ func TestIdentityChallengeSchemeVerifyRequestAndAttachResponse(t *testing.T) { h = http.Header{} i.AttachChallenge(h, "i2") r := http.Header{} - i2 := NewIdentityChallengeScheme("") + i2 := NewIdentityChallengeScheme() chal, key, err := i2.VerifyRequestAndAttachResponse(r, h) require.Empty(t, r.Get(IdentityChallengeHeader)) require.Empty(t, chal) @@ -69,7 +72,7 @@ func TestIdentityChallengeSchemeVerifyRequestAndAttachResponse(t *testing.T) { h = http.Header{} i.AttachChallenge(h, "i2") r = http.Header{} - i2 = NewIdentityChallengeScheme("not i2") + i2 = NewIdentityChallengeScheme(NetIdentityDedupNames("not i2")) chal, key, err = i2.VerifyRequestAndAttachResponse(r, h) require.Empty(t, r.Get(IdentityChallengeHeader)) require.Empty(t, chal) @@ -80,7 +83,7 @@ func TestIdentityChallengeSchemeVerifyRequestAndAttachResponse(t *testing.T) { h = http.Header{} h.Add(IdentityChallengeHeader, "garbage") r = http.Header{} - i2 = NewIdentityChallengeScheme("i2") + i2 = NewIdentityChallengeScheme(NetIdentityDedupNames("i2")) chal, key, err = i2.VerifyRequestAndAttachResponse(r, h) require.Empty(t, r.Get(IdentityChallengeHeader)) require.Empty(t, chal) @@ -91,7 +94,7 @@ func TestIdentityChallengeSchemeVerifyRequestAndAttachResponse(t *testing.T) { h = http.Header{} i.AttachChallenge(h, "i2") r = http.Header{} - i2 = NewIdentityChallengeScheme("i2") + i2 = NewIdentityChallengeScheme(NetIdentityDedupNames("i2")) chal, key, err = i2.VerifyRequestAndAttachResponse(r, h) require.NotEmpty(t, r.Get(IdentityChallengeHeader)) require.NotEmpty(t, chal) @@ -103,11 +106,11 @@ func TestIdentityChallengeNoErrorWhenNotParticipating(t *testing.T) { partitiontest.PartitionTest(t) // blank deduplication name will make the scheme a no-op - iNotParticipate := NewIdentityChallengeScheme("") + iNotParticipate := NewIdentityChallengeScheme() // create a request header first h := http.Header{} - i := NewIdentityChallengeScheme("i1") + i := NewIdentityChallengeScheme(NetIdentityDedupNames("i1")) origChal := i.AttachChallenge(h, "i1") require.NotEmpty(t, h.Get(IdentityChallengeHeader)) require.NotEmpty(t, origChal) @@ -120,7 +123,7 @@ func TestIdentityChallengeNoErrorWhenNotParticipating(t *testing.T) { // create a response h2 := http.Header{} - i2 := NewIdentityChallengeScheme("i2") + i2 := NewIdentityChallengeScheme(NetIdentityDedupNames("i2")) i2.VerifyRequestAndAttachResponse(h2, h) // confirm a nil scheme will not return values or error @@ -148,7 +151,7 @@ func TestIdentityChallengeSchemeVerifyResponse(t *testing.T) { partitiontest.PartitionTest(t) h := http.Header{} - i := NewIdentityChallengeScheme("i1") + i := NewIdentityChallengeScheme(NetIdentityDedupNames("i1")) // author a challenge to ourselves origChal := i.AttachChallenge(h, "i1") require.NotEmpty(t, h.Get(IdentityChallengeHeader)) @@ -176,7 +179,7 @@ func TestIdentityChallengeSchemeBadSignature(t *testing.T) { partitiontest.PartitionTest(t) h := http.Header{} - i := NewIdentityChallengeScheme("i1") + i := NewIdentityChallengeScheme(NetIdentityDedupNames("i1")) // Copy the logic of attaching the header and signing so we can sign it wrong c := identityChallengeSigned{ Msg: identityChallenge{ @@ -204,7 +207,7 @@ func TestIdentityChallengeSchemeBadPayload(t *testing.T) { partitiontest.PartitionTest(t) h := http.Header{} - i := NewIdentityChallengeScheme("i1") + i := NewIdentityChallengeScheme(NetIdentityDedupNames("i1")) h.Add(IdentityChallengeHeader, "NOT VALID BASE 64! :)") // observe that VerifyRequestAndAttachResponse won't do anything on bad signature @@ -222,7 +225,7 @@ func TestIdentityChallengeSchemeBadResponseSignature(t *testing.T) { partitiontest.PartitionTest(t) h := http.Header{} - i := NewIdentityChallengeScheme("i1") + i := NewIdentityChallengeScheme(NetIdentityDedupNames("i1")) // author a challenge to ourselves origChal := i.AttachChallenge(h, "i1") require.NotEmpty(t, h.Get(IdentityChallengeHeader)) @@ -253,7 +256,7 @@ func TestIdentityChallengeSchemeBadResponsePayload(t *testing.T) { partitiontest.PartitionTest(t) h := http.Header{} - i := NewIdentityChallengeScheme("i1") + i := NewIdentityChallengeScheme(NetIdentityDedupNames("i1")) // author a challenge to ourselves origChal := i.AttachChallenge(h, "i1") require.NotEmpty(t, h.Get(IdentityChallengeHeader)) @@ -275,7 +278,7 @@ func TestIdentityChallengeSchemeWrongChallenge(t *testing.T) { partitiontest.PartitionTest(t) h := http.Header{} - i := NewIdentityChallengeScheme("i1") + i := NewIdentityChallengeScheme(NetIdentityDedupNames("i1")) // author a challenge to ourselves origChal := i.AttachChallenge(h, "i1") require.NotEmpty(t, h.Get(IdentityChallengeHeader)) @@ -366,3 +369,67 @@ func TestIdentityTrackerHandlerGuard(t *testing.T) { } require.Equal(t, OutgoingMessage{}, identityVerificationHandler(msg)) } + +// TestNewIdentityChallengeScheme ensures NewIdentityChallengeScheme returns +// a correct identityChallengePublicKeyScheme for the following inputs: +// DedupNames(a, b) vs DedupNames(a), DedupNames(b) +// Empty vs non-empty PeerID, PublicAddress +// Empty vs non-empty Signer +func TestNewIdentityChallengeScheme(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + s1 := NewIdentityChallengeScheme() + s2 := NewIdentityChallengeScheme(NetIdentityDedupNames("")) + s3 := NewIdentityChallengeScheme(NetIdentityDedupNames("", "")) + s4 := NewIdentityChallengeScheme(NetIdentityDedupNames(""), NetIdentityDedupNames("")) + require.Equal(t, s1, s2) + require.Equal(t, s2, s3) + require.Equal(t, s3, s4) + require.Zero(t, *s1) + + s1 = NewIdentityChallengeScheme(NetIdentityDedupNames("a", "a")) + s2 = NewIdentityChallengeScheme(NetIdentityDedupNames("a"), NetIdentityDedupNames("a")) + require.Equal(t, s1.dedupNames, s2.dedupNames) + require.Len(t, s1.dedupNames, 1) + require.IsType(t, &identityChallengeLegacySigner{}, s1.identityKeys) + require.IsType(t, &identityChallengeLegacySigner{}, s2.identityKeys) + require.NotEqual(t, s1.identityKeys, s2.identityKeys) + + s1 = NewIdentityChallengeScheme(NetIdentityDedupNames("a", "b")) + s2 = NewIdentityChallengeScheme(NetIdentityDedupNames("a"), NetIdentityDedupNames("b")) + require.Equal(t, s1.dedupNames, s2.dedupNames) + require.Len(t, s1.dedupNames, 2) + require.IsType(t, &identityChallengeLegacySigner{}, s1.identityKeys) + require.IsType(t, &identityChallengeLegacySigner{}, s2.identityKeys) + require.NotEqual(t, s1.identityKeys, s2.identityKeys) + + s1 = NewIdentityChallengeScheme(NetIdentityDedupNames("", "a")) + s2 = NewIdentityChallengeScheme(NetIdentityDedupNames("a"), NetIdentityDedupNames("")) + s3 = NewIdentityChallengeScheme(NetIdentityDedupNames("a", "")) + s4 = NewIdentityChallengeScheme(NetIdentityDedupNames(""), NetIdentityDedupNames("a")) + require.Equal(t, s1.dedupNames, s2.dedupNames) + require.Equal(t, s2.dedupNames, s3.dedupNames) + require.Equal(t, s3.dedupNames, s4.dedupNames) + require.Len(t, s1.dedupNames, 1) + require.IsType(t, &identityChallengeLegacySigner{}, s1.identityKeys) + require.IsType(t, &identityChallengeLegacySigner{}, s2.identityKeys) + require.NotEqual(t, s1.identityKeys, s2.identityKeys) + + s1 = NewIdentityChallengeScheme(NetIdentityDedupNames("a"), NetIdentitySigner(&identityChallengeLegacySigner{})) + require.Len(t, s1.dedupNames, 1) + require.IsType(t, &identityChallengeLegacySigner{}, s1.identityKeys) + + var seed crypto.Seed + crypto.RandBytes(seed[:]) + signer := &identityChallengeLegacySigner{keys: crypto.GenerateSignatureSecrets(seed)} + s1 = NewIdentityChallengeScheme(NetIdentityDedupNames("a"), NetIdentitySigner(signer)) + require.Len(t, s1.dedupNames, 1) + require.IsType(t, &identityChallengeLegacySigner{}, s1.identityKeys) + require.Equal(t, signer, s1.identityKeys) + + s1 = NewIdentityChallengeScheme(NetIdentityDedupNames(""), NetIdentitySigner(signer)) + require.Empty(t, s1) + s1 = NewIdentityChallengeScheme(NetIdentitySigner(signer)) + require.Empty(t, s1) +} diff --git a/network/p2p/logger.go b/network/p2p/logger.go index 26c738e1e1..741755745b 100644 --- a/network/p2p/logger.go +++ b/network/p2p/logger.go @@ -19,6 +19,7 @@ package p2p import ( + "errors" "runtime" "strings" @@ -55,19 +56,37 @@ type loggingCore struct { zapcore.Core } +// ErrInvalidLogLevel is returned when an invalid log level is provided. +var ErrInvalidLogLevel = errors.New("invalid log level") + // EnableP2PLogging enables libp2p logging into the provided logger with the provided level. -func EnableP2PLogging(log logging.Logger, l logging.Level) { +func EnableP2PLogging(log logging.Logger, l logging.Level) error { core := loggingCore{ log: log, level: l, } + err := SetP2PLogLevel(l) + if err != nil { + return err + } + p2plogging.SetPrimaryCore(&core) + return nil +} + +// SetP2PLogLevel sets the log level for libp2p logging. +func SetP2PLogLevel(l logging.Level) error { + var seen bool for p2pLevel, logLevel := range levelsMap { if logLevel == l { p2plogging.SetAllLoggers(p2plogging.LogLevel(p2pLevel)) + seen = true break } } - p2plogging.SetPrimaryCore(&core) + if !seen { + return ErrInvalidLogLevel + } + return nil } func (c *loggingCore) Enabled(l zapcore.Level) bool { diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index f67f79f427..2c64b63eab 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -38,6 +38,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" "github.com/libp2p/go-libp2p/p2p/security/noise" "github.com/libp2p/go-libp2p/p2p/transport/tcp" @@ -116,9 +117,14 @@ func MakeHost(cfg config.Local, datadir string, pstore *pstore.PeerStore) (host. listenAddr = "" } - var disableMetrics = func(cfg *libp2p.Config) error { return nil } + var enableMetrics = func(cfg *libp2p.Config) error { cfg.DisableMetrics = false; return nil } metrics.DefaultRegistry().Register(&metrics.PrometheusDefaultMetrics) + rm, err := configureResourceManager(cfg) + if err != nil { + return nil, "", err + } + host, err := libp2p.New( libp2p.Identity(privKey), libp2p.UserAgent(ua), @@ -127,11 +133,29 @@ func MakeHost(cfg config.Local, datadir string, pstore *pstore.PeerStore) (host. libp2p.Peerstore(pstore), libp2p.NoListenAddrs, libp2p.Security(noise.ID, noise.New), - disableMetrics, + enableMetrics, + libp2p.ResourceManager(rm), ) return host, listenAddr, err } +func configureResourceManager(cfg config.Local) (network.ResourceManager, error) { + // see https://github.com/libp2p/go-libp2p/tree/master/p2p/host/resource-manager for more details + scalingLimits := rcmgr.DefaultLimits + libp2p.SetDefaultServiceLimits(&scalingLimits) + scaledDefaultLimits := scalingLimits.AutoScale() + + limitConfig := rcmgr.PartialLimitConfig{ + System: rcmgr.ResourceLimits{ + Conns: rcmgr.LimitVal(cfg.P2PIncomingConnectionsLimit), + }, + // Everything else is default. The exact values will come from `scaledDefaultLimits` above. + } + limiter := rcmgr.NewFixedLimiter(limitConfig.Build(scaledDefaultLimits)) + rm, err := rcmgr.NewResourceManager(limiter) + return rm, err +} + // MakeService creates a P2P service instance func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler, bootstrapPeers []*peer.AddrInfo) (*serviceImpl, error) { @@ -150,7 +174,6 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h ho return nil, err } return &serviceImpl{ - log: log, listenAddr: listenAddr, host: h, diff --git a/network/p2p/peerID_test.go b/network/p2p/peerID_test.go index 9d7729d593..beed18868c 100644 --- a/network/p2p/peerID_test.go +++ b/network/p2p/peerID_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/test/partitiontest" ) @@ -103,3 +104,18 @@ func TestGetPrivKeyUserGeneratedEphemeral(t *testing.T) { _, err = loadPrivateKeyFromFile(path.Join(tempdir, DefaultPrivKeyPath)) assert.True(t, os.IsNotExist(err)) } + +func TestPeerIDChallengeSigner(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + privKey, err := generatePrivKey() + require.NoError(t, err) + + data := make([]byte, 111) + crypto.RandBytes(data) + signer := PeerIDChallengeSigner{key: privKey} + pubKey := privKey.GetPublic() + pubKeyRaw, err := pubKey.Raw() + require.NoError(t, err) + require.Equal(t, crypto.PublicKey(pubKeyRaw), signer.PublicKey()) +} diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 4d6efdac83..9f5448c0ea 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -26,6 +26,7 @@ import ( "time" "github.com/algorand/go-algorand/config" + algocrypto "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/logging/telemetryspec" "github.com/algorand/go-algorand/network/limitcaller" @@ -81,6 +82,8 @@ type P2PNetwork struct { nodeInfo NodeInfo pstore *peerstore.PeerStore httpServer *p2p.HTTPServer + + identityTracker identityTracker } type bootstrapper struct { @@ -191,30 +194,8 @@ type p2pPeerStats struct { txReceived atomic.Uint64 } -// gossipSubPeer implements the DeadlineSettableConn, IPAddressable, and ErlClient interfaces. -type gossipSubPeer struct { - peerID peer.ID - net GossipNode - routingAddr [8]byte -} - -func (p gossipSubPeer) GetNetwork() GossipNode { return p.net } - -func (p gossipSubPeer) OnClose(f func()) { - net := p.GetNetwork().(*P2PNetwork) - net.wsPeersLock.Lock() - defer net.wsPeersLock.Unlock() - if wsp, ok := net.wsPeers[p.peerID]; ok { - wsp.OnClose(f) - } -} - -func (p gossipSubPeer) RoutingAddr() []byte { - return p.routingAddr[:] -} - // NewP2PNetwork returns an instance of GossipNode that uses the p2p.Service -func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, node NodeInfo) (*P2PNetwork, error) { +func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, node NodeInfo, identityOpts *identityOpts) (*P2PNetwork, error) { const readBufferLen = 2048 // create Peerstore and add phonebook addresses @@ -262,7 +243,17 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo broadcastQueueBulk: make(chan broadcastRequest, 100), } - p2p.EnableP2PLogging(log, logging.Level(cfg.BaseLoggerDebugLevel)) + if identityOpts != nil { + net.identityTracker = identityOpts.tracker + } + if net.identityTracker == nil { + net.identityTracker = noopIdentityTracker{} + } + + err = p2p.EnableP2PLogging(log, logging.Level(cfg.BaseLoggerDebugLevel)) + if err != nil { + return nil, err + } h, la, err := p2p.MakeHost(cfg, datadir, pstore) if err != nil { @@ -380,7 +371,16 @@ func (n *P2PNetwork) Stop() { n.wsPeersConnectivityCheckTicker = nil } n.innerStop() + + // This is a workaround for a race between PubSub.processLoop (triggered by context cancellation below) termination + // and this function returning that causes main goroutine to exit before + // PubSub.processLoop goroutine finishes logging its termination message + // to already closed logger. Not seen in wild, only in tests. + if n.log.GetLevel() >= logging.Warn { + _ = p2p.SetP2PLogLevel(logging.Warn) + } n.ctxCancel() + n.service.Close() n.bootstrapperStop() n.httpServer.Close() @@ -541,8 +541,6 @@ func (n *P2PNetwork) Disconnect(badpeer DisconnectablePeer) { n.wsPeersLock.Lock() defer n.wsPeersLock.Unlock() switch p := badpeer.(type) { - case gossipSubPeer: // Disconnect came from a message received via GossipSub - peerID, wsp = p.peerID, n.wsPeers[p.peerID] case *wsPeer: // Disconnect came from a message received via wsPeer peerID, wsp = n.wsPeersToIDs[p], p default: @@ -755,21 +753,32 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea ma := stream.Conn().RemoteMultiaddr() addr := ma.String() if addr == "" { - n.log.Warnf("Could not get address for peer %s", p2pPeer) + n.log.Warnf("Cannot get address for peer %s", p2pPeer) } - // create a wsPeer for this stream and added it to the peers map. + // create a wsPeer for this stream and added it to the peers map. addrInfo := &peer.AddrInfo{ID: p2pPeer, Addrs: []multiaddr.Multiaddr{ma}} maxIdleConnsPerHost := int(n.config.ConnectionsRateLimitingCount) client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout, maxIdleConnsPerHost) if err != nil { client = nil } + var netIdentPeerID algocrypto.PublicKey + if p2pPeerPubKey, err0 := p2pPeer.ExtractPublicKey(); err0 == nil { + if b, err0 := p2pPeerPubKey.Raw(); err0 == nil { + netIdentPeerID = algocrypto.PublicKey(b) + } else { + n.log.Warnf("Cannot get raw pubkey for peer %s", p2pPeer) + } + } else { + n.log.Warnf("Cannot get pubkey for peer %s", p2pPeer) + } peerCore := makePeerCore(ctx, n, n.log, n.handler.readBuffer, addr, client, addr) wsp := &wsPeer{ wsPeerCore: peerCore, conn: &wsPeerConnP2PImpl{stream: stream}, outgoing: !incoming, + identity: netIdentPeerID, } protos, err := n.pstore.GetProtocols(p2pPeer) if err != nil { @@ -777,6 +786,19 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea } wsp.TelemetryGUID, wsp.InstanceName = p2p.GetPeerTelemetryInfo(protos) + localAddr, has := n.Address() + if !has { + n.log.Warn("Could not get local address") + } + n.wsPeersLock.Lock() + ok := n.identityTracker.setIdentity(wsp) + n.wsPeersLock.Unlock() + if !ok { + networkPeerIdentityDisconnect.Inc(nil) + n.log.With("remote", addr).With("local", localAddr).Warn("peer deduplicated before adding because the identity is already known") + stream.Close() + } + wsp.init(n.config, outgoingMessagesBufferSize) n.wsPeersLock.Lock() n.wsPeers[p2pPeer] = wsp @@ -790,10 +812,6 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea event = "ConnectedIn" msg = "Accepted incoming connection from peer %s" } - localAddr, has := n.Address() - if !has { - n.log.Warn("Could not get local address") - } n.log.With("event", event).With("remote", addr).With("local", localAddr).Infof(msg, p2pPeer.String()) if n.log.GetLevel() >= logging.Debug { @@ -815,6 +833,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea func (n *P2PNetwork) peerRemoteClose(peer *wsPeer, reason disconnectReason) { remotePeerID := peer.conn.(*wsPeerConnP2PImpl).stream.Conn().RemotePeer() n.wsPeersLock.Lock() + n.identityTracker.removeIdentity(peer) delete(n.wsPeers, remotePeerID) delete(n.wsPeersToIDs, peer) n.wsPeersLock.Unlock() @@ -913,7 +932,9 @@ func (n *P2PNetwork) txTopicHandleLoop() { func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg *pubsub.Message) pubsub.ValidationResult { var routingAddr [8]byte n.wsPeersLock.Lock() - if wsp, ok := n.wsPeers[peerID]; ok { + var wsp *wsPeer + var ok bool + if wsp, ok = n.wsPeers[peerID]; ok { copy(routingAddr[:], wsp.RoutingAddr()) } else { // well, otherwise use last 8 bytes of peerID @@ -922,7 +943,8 @@ func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg * n.wsPeersLock.Unlock() inmsg := IncomingMessage{ - Sender: gossipSubPeer{peerID: msg.ReceivedFrom, net: n, routingAddr: routingAddr}, + // Sender: gossipSubPeer{peerID: msg.ReceivedFrom, net: n, routingAddr: routingAddr}, + Sender: wsp, Tag: protocol.TxnTag, Data: msg.Data, Net: n, diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index ff1f40a63c..0eac398431 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -39,7 +39,6 @@ import ( "github.com/algorand/go-algorand/network/phonebook" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" - "github.com/algorand/go-algorand/util" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/crypto" @@ -69,7 +68,7 @@ func TestP2PSubmitTX(t *testing.T) { cfg.ForceFetchTransactions = true cfg.NetAddress = "127.0.0.1:0" log := logging.TestingLog(t) - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) netA.Start() defer netA.Stop() @@ -81,12 +80,12 @@ func TestP2PSubmitTX(t *testing.T) { multiAddrStr := addrsA[0].String() phoneBookAddresses := []string{multiAddrStr} - netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) netB.Start() defer netB.Stop() - netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) netC.Start() defer netC.Stop() @@ -162,7 +161,7 @@ func TestP2PSubmitTXNoGossip(t *testing.T) { cfg.ForceFetchTransactions = true cfg.NetAddress = "127.0.0.1:0" log := logging.TestingLog(t) - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) netA.Start() defer netA.Stop() @@ -174,7 +173,7 @@ func TestP2PSubmitTXNoGossip(t *testing.T) { multiAddrStr := addrsA[0].String() phoneBookAddresses := []string{multiAddrStr} - netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) netB.Start() defer netB.Stop() @@ -193,7 +192,7 @@ func TestP2PSubmitTXNoGossip(t *testing.T) { cfg.ForceFetchTransactions = false // Have to unset NetAddress to get IsGossipServer to return false cfg.NetAddress = "" - netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) netC.Start() defer netC.Stop() @@ -259,7 +258,7 @@ func TestP2PSubmitWS(t *testing.T) { cfg := config.GetDefaultLocal() cfg.NetAddress = "127.0.0.1:0" log := logging.TestingLog(t) - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) err = netA.Start() @@ -273,13 +272,13 @@ func TestP2PSubmitWS(t *testing.T) { multiAddrStr := addrsA[0].String() phoneBookAddresses := []string{multiAddrStr} - netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) err = netB.Start() require.NoError(t, err) defer netB.Stop() - netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) err = netC.Start() require.NoError(t, err) @@ -393,7 +392,7 @@ func TestP2PNetworkAddress(t *testing.T) { cfg := config.GetDefaultLocal() log := logging.TestingLog(t) - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) defer netA.Stop() require.NoError(t, err) addrInfo := netA.service.AddrInfo() @@ -605,7 +604,7 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, test.nis[0]) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, test.nis[0], nil) require.NoError(t, err) err = netA.Start() @@ -619,13 +618,13 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) { multiAddrStr := addrsA[0].String() phoneBookAddresses := []string{multiAddrStr} - netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, test.nis[1]) + netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, test.nis[1], nil) require.NoError(t, err) err = netB.Start() require.NoError(t, err) defer netB.Stop() - netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, test.nis[2]) + netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, test.nis[2], nil) require.NoError(t, err) err = netC.Start() require.NoError(t, err) @@ -753,7 +752,7 @@ func TestP2PHTTPHandler(t *testing.T) { cfg.NetAddress = "127.0.0.1:0" log := logging.TestingLog(t) - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) h := &p2phttpHandler{t, "hello", nil} @@ -822,7 +821,7 @@ func TestP2PRelay(t *testing.T) { cfg.NetAddress = "127.0.0.1:0" log := logging.TestingLog(t) log.Debugln("Starting netA") - netA, err := NewP2PNetwork(log.With("net", "netA"), cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netA, err := NewP2PNetwork(log.With("net", "netA"), cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) err = netA.Start() @@ -840,7 +839,7 @@ func TestP2PRelay(t *testing.T) { // Explicitly unset NetAddress for netB cfg.NetAddress = "" log.Debugf("Starting netB with phonebook addresses %v", phoneBookAddresses) - netB, err := NewP2PNetwork(log.With("net", "netB"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netB, err := NewP2PNetwork(log.With("net", "netB"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) err = netB.Start() require.NoError(t, err) @@ -907,7 +906,7 @@ func TestP2PRelay(t *testing.T) { // ensure all messages from netB and netC are received by netA cfg.NetAddress = "127.0.0.1:0" log.Debugf("Starting netC with phonebook addresses %v", phoneBookAddresses) - netC, err := NewP2PNetwork(log.With("net", "netC"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + netC, err := NewP2PNetwork(log.With("net", "netC"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) require.True(t, netC.relayMessages) err = netC.Start() @@ -1155,22 +1154,3 @@ func TestMergeP2PAddrInfoResolvedAddresses(t *testing.T) { }) } } - -// TestP2PGossipSubPeerCasts checks that gossipSubPeer implements the ErlClient and IPAddressable interfaces -// needed by TxHandler -func TestP2PGossipSubPeerCasts(t *testing.T) { - partitiontest.PartitionTest(t) - t.Parallel() - - var g interface{} = gossipSubPeer{} - _, ok := g.(util.ErlClient) - require.True(t, ok) - - _, ok = g.(IPAddressable) - require.True(t, ok) - - // check that gossipSubPeer is hashable as ERL wants - var m map[util.ErlClient]struct{} - require.Equal(t, m[gossipSubPeer{}], struct{}{}) - require.Equal(t, m[g.(util.ErlClient)], struct{}{}) -} diff --git a/network/requestLogger_test.go b/network/requestLogger_test.go index 0de6a41c73..c6bde8956e 100644 --- a/network/requestLogger_test.go +++ b/network/requestLogger_test.go @@ -50,12 +50,13 @@ func TestRequestLogger(t *testing.T) { dl := eventsDetailsLogger{Logger: log, eventReceived: make(chan interface{}, 1), eventIdentifier: telemetryspec.HTTPRequestEvent} log.SetLevel(logging.Level(defaultConfig.BaseLoggerDebugLevel)) netA := &WebsocketNetwork{ - log: dl, - config: defaultConfig, - phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: "go-test-network-genesis", - NetworkID: config.Devtestnet, - peerStater: peerConnectionStater{log: log}, + log: dl, + config: defaultConfig, + phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), + GenesisID: "go-test-network-genesis", + NetworkID: config.Devtestnet, + peerStater: peerConnectionStater{log: log}, + identityTracker: noopIdentityTracker{}, } netA.config.EnableRequestLogger = true netA.setup() diff --git a/network/requestTracker_test.go b/network/requestTracker_test.go index d814507c78..46f003e0f8 100644 --- a/network/requestTracker_test.go +++ b/network/requestTracker_test.go @@ -87,12 +87,13 @@ func TestRateLimiting(t *testing.T) { // This test is conducted locally, so we want to treat all hosts the same for counting incoming requests. testConfig.DisableLocalhostConnectionRateLimit = false wn := &WebsocketNetwork{ - log: log, - config: testConfig, - phonebook: phonebook.MakePhonebook(1, 1), - GenesisID: "go-test-network-genesis", - NetworkID: config.Devtestnet, - peerStater: peerConnectionStater{log: log}, + log: log, + config: testConfig, + phonebook: phonebook.MakePhonebook(1, 1), + GenesisID: "go-test-network-genesis", + NetworkID: config.Devtestnet, + peerStater: peerConnectionStater{log: log}, + identityTracker: noopIdentityTracker{}, } // increase the IncomingConnectionsLimit/MaxConnectionsPerIP limits, since we don't want to test these. diff --git a/network/wsNetwork.go b/network/wsNetwork.go index f222d2ff27..1c0f3e8676 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -214,9 +214,6 @@ type WebsocketNetwork struct { NetworkID protocol.NetworkID RandomID string - peerID p2p.PeerID - peerIDSigner identityChallengeSigner - ready atomic.Int32 readyChan chan struct{} @@ -631,8 +628,6 @@ func (wn *WebsocketNetwork) setup() { wn.outgoingMessagesBufferSize = outgoingMessagesBufferSize wn.wsMaxHeaderBytes = wsMaxHeaderBytes - wn.identityTracker = NewIdentityTracker() - wn.broadcaster = msgBroadcaster{ ctx: wn.ctx, log: wn.log, @@ -699,7 +694,7 @@ func (wn *WebsocketNetwork) Start() error { wn.messagesOfInterestEnc = MarshallMessageOfInterestMap(wn.messagesOfInterest) } - if wn.config.IsGossipServer() { + if wn.config.IsGossipServer() || wn.config.ForceRelayMessages { listener, err := net.Listen("tcp", wn.config.NetAddress) if err != nil { wn.log.Errorf("network could not listen %v: %s", wn.config.NetAddress, err) @@ -736,16 +731,11 @@ func (wn *WebsocketNetwork) Start() error { } } // if the network has a public address or a libp2p peer ID, use that as the name for connection deduplication - if wn.config.PublicAddress != "" || (wn.peerID != "" && wn.peerIDSigner != nil) { + if wn.config.PublicAddress != "" || wn.identityScheme != nil { wn.RegisterHandlers(identityHandlers) } if wn.identityScheme == nil { - if wn.peerID != "" && wn.peerIDSigner != nil { - wn.identityScheme = NewIdentityChallengeSchemeWithSigner(string(wn.peerID), wn.peerIDSigner) - } - if wn.config.PublicAddress != "" { - wn.identityScheme = NewIdentityChallengeScheme(wn.config.PublicAddress) - } + wn.identityScheme = NewIdentityChallengeScheme(NetIdentityDedupNames(wn.config.PublicAddress)) } wn.meshUpdateRequests <- meshRequest{false, nil} @@ -2115,7 +2105,8 @@ func (wn *WebsocketNetwork) tryConnect(netAddr, gossipAddr string) { var idChallenge identityChallengeValue if wn.identityScheme != nil { - idChallenge = wn.identityScheme.AttachChallenge(requestHeader, netAddr) + theirAddr := strings.ToLower(netAddr) + idChallenge = wn.identityScheme.AttachChallenge(requestHeader, theirAddr) } // for backward compatibility, include the ProtocolVersion header as well. @@ -2305,7 +2296,7 @@ func (wn *WebsocketNetwork) SetPeerData(peer Peer, key string, value interface{} } // NewWebsocketNetwork constructor for websockets based gossip network -func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, nodeInfo NodeInfo, peerID p2p.PeerID, idSigner identityChallengeSigner) (wn *WebsocketNetwork, err error) { +func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, nodeInfo NodeInfo, identityOpts *identityOpts) (wn *WebsocketNetwork, err error) { pb := phonebook.MakePhonebook(config.ConnectionsRateLimitingCount, time.Duration(config.ConnectionsRateLimitingWindowSeconds)*time.Second) @@ -2324,8 +2315,6 @@ func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddre GenesisID: genesisID, NetworkID: networkID, nodeInfo: nodeInfo, - peerID: peerID, - peerIDSigner: idSigner, resolveSRVRecords: tools_network.ReadFromSRV, peerStater: peerConnectionStater{ log: log, @@ -2334,13 +2323,22 @@ func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddre }, } + // initialize net identity tracker either from the provided options or with a new one + if identityOpts != nil { + wn.identityScheme = identityOpts.scheme + wn.identityTracker = identityOpts.tracker + } + if wn.identityTracker == nil { + wn.identityTracker = NewIdentityTracker() + } + wn.setup() return wn, nil } // NewWebsocketGossipNode constructs a websocket network node and returns it as a GossipNode interface implementation func NewWebsocketGossipNode(log logging.Logger, config config.Local, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID) (gn GossipNode, err error) { - return NewWebsocketNetwork(log, config, phonebookAddresses, genesisID, networkID, nil, "", nil) + return NewWebsocketNetwork(log, config, phonebookAddresses, genesisID, networkID, nil, nil) } // SetPrioScheme specifies the network priority scheme for a network node diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 038a9d6e2d..6af3a697fc 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -128,12 +128,13 @@ func makeTestWebsocketNodeWithConfig(t testing.TB, conf config.Local, opts ...te log := logging.TestingLog(t) log.SetLevel(logging.Warn) wn := &WebsocketNetwork{ - log: log, - config: conf, - phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: genesisID, - NetworkID: config.Devtestnet, - peerStater: peerConnectionStater{log: log}, + log: log, + config: conf, + phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), + GenesisID: genesisID, + NetworkID: config.Devtestnet, + peerStater: peerConnectionStater{log: log}, + identityTracker: NewIdentityTracker(), } // apply options to newly-created WebsocketNetwork, if provided for _, opt := range opts { @@ -1055,12 +1056,13 @@ func makeTestFilterWebsocketNode(t *testing.T, nodename string) *WebsocketNetwor dc.OutgoingMessageFilterBucketCount = 3 dc.OutgoingMessageFilterBucketSize = 128 wn := &WebsocketNetwork{ - log: logging.TestingLog(t).With("node", nodename), - config: dc, - phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: genesisID, - NetworkID: config.Devtestnet, - peerStater: peerConnectionStater{log: logging.TestingLog(t).With("node", nodename)}, + log: logging.TestingLog(t).With("node", nodename), + config: dc, + phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), + GenesisID: genesisID, + NetworkID: config.Devtestnet, + peerStater: peerConnectionStater{log: logging.TestingLog(t).With("node", nodename)}, + identityTracker: noopIdentityTracker{}, } require.True(t, wn.config.EnableIncomingMessageFilter) wn.setup() @@ -1696,7 +1698,7 @@ type mockIdentityScheme struct { } func newMockIdentityScheme(t *testing.T) *mockIdentityScheme { - return &mockIdentityScheme{t: t, realScheme: NewIdentityChallengeScheme("any")} + return &mockIdentityScheme{t: t, realScheme: NewIdentityChallengeScheme(NetIdentityDedupNames("any"))} } func (i mockIdentityScheme) AttachChallenge(attach http.Header, addr string) identityChallengeValue { if i.attachChallenge != nil { @@ -1768,7 +1770,7 @@ func TestPeeringWithBadIdentityChallenge(t *testing.T) { { name: "incorrect address", attachChallenge: func(attach http.Header, addr string) identityChallengeValue { - s := NewIdentityChallengeScheme("does not matter") // make a scheme to use its keys + s := NewIdentityChallengeScheme(NetIdentityDedupNames("does not matter")) // make a scheme to use its keys c := identityChallenge{ Key: s.identityKeys.PublicKey(), Challenge: newIdentityChallengeValue(), @@ -1786,7 +1788,7 @@ func TestPeeringWithBadIdentityChallenge(t *testing.T) { { name: "bad signature", attachChallenge: func(attach http.Header, addr string) identityChallengeValue { - s := NewIdentityChallengeScheme("does not matter") // make a scheme to use its keys + s := NewIdentityChallengeScheme(NetIdentityDedupNames("does not matter")) // make a scheme to use its keys c := identityChallenge{ Key: s.identityKeys.PublicKey(), Challenge: newIdentityChallengeValue(), @@ -1901,7 +1903,7 @@ func TestPeeringWithBadIdentityChallengeResponse(t *testing.T) { { name: "incorrect original challenge", verifyAndAttachResponse: func(attach http.Header, h http.Header) (identityChallengeValue, crypto.PublicKey, error) { - s := NewIdentityChallengeScheme("does not matter") // make a scheme to use its keys + s := NewIdentityChallengeScheme(NetIdentityDedupNames("does not matter")) // make a scheme to use its keys // decode the header to an identityChallenge msg, _ := base64.StdEncoding.DecodeString(h.Get(IdentityChallengeHeader)) idChal := identityChallenge{} @@ -1924,7 +1926,7 @@ func TestPeeringWithBadIdentityChallengeResponse(t *testing.T) { { name: "bad signature", verifyAndAttachResponse: func(attach http.Header, h http.Header) (identityChallengeValue, crypto.PublicKey, error) { - s := NewIdentityChallengeScheme("does not matter") // make a scheme to use its keys + s := NewIdentityChallengeScheme(NetIdentityDedupNames("does not matter")) // make a scheme to use its keys // decode the header to an identityChallenge msg, _ := base64.StdEncoding.DecodeString(h.Get(IdentityChallengeHeader)) idChal := identityChallenge{} @@ -2056,7 +2058,7 @@ func TestPeeringWithBadIdentityVerification(t *testing.T) { resp := identityChallengeResponseSigned{} err = protocol.Decode(msg, &resp) require.NoError(t, err) - s := NewIdentityChallengeScheme("does not matter") // make a throwaway key + s := NewIdentityChallengeScheme(NetIdentityDedupNames("does not matter")) // make a throwaway key ver := identityVerificationMessageSigned{ // fill in correct ResponseChallenge field Msg: identityVerificationMessage{ResponseChallenge: resp.Msg.ResponseChallenge}, @@ -2074,7 +2076,7 @@ func TestPeeringWithBadIdentityVerification(t *testing.T) { // when the verification signature doesn't match the peer's expectation (the previously exchanged identity), peer is disconnected name: "bad signature", verifyResponse: func(t *testing.T, h http.Header, c identityChallengeValue) (crypto.PublicKey, []byte, error) { - s := NewIdentityChallengeScheme("does not matter") // make a throwaway key + s := NewIdentityChallengeScheme(NetIdentityDedupNames("does not matter")) // make a throwaway key ver := identityVerificationMessageSigned{ // fill in wrong ResponseChallenge field Msg: identityVerificationMessage{ResponseChallenge: newIdentityChallengeValue()}, @@ -2566,12 +2568,13 @@ func TestSlowPeerDisconnection(t *testing.T) { log := logging.TestingLog(t) log.SetLevel(logging.Info) wn := &WebsocketNetwork{ - log: log, - config: defaultConfig, - phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: genesisID, - NetworkID: config.Devtestnet, - peerStater: peerConnectionStater{log: log}, + log: log, + config: defaultConfig, + phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), + GenesisID: genesisID, + NetworkID: config.Devtestnet, + peerStater: peerConnectionStater{log: log}, + identityTracker: noopIdentityTracker{}, } wn.setup() wn.broadcaster.slowWritingPeerMonitorInterval = time.Millisecond * 50 @@ -2642,12 +2645,13 @@ func TestForceMessageRelaying(t *testing.T) { log := logging.TestingLog(t) log.SetLevel(logging.Level(defaultConfig.BaseLoggerDebugLevel)) wn := &WebsocketNetwork{ - log: log, - config: defaultConfig, - phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: genesisID, - NetworkID: config.Devtestnet, - peerStater: peerConnectionStater{log: log}, + log: log, + config: defaultConfig, + phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), + GenesisID: genesisID, + NetworkID: config.Devtestnet, + peerStater: peerConnectionStater{log: log}, + identityTracker: noopIdentityTracker{}, } wn.setup() wn.eventualReadyDelay = time.Second @@ -2737,12 +2741,13 @@ func TestCheckProtocolVersionMatch(t *testing.T) { log := logging.TestingLog(t) log.SetLevel(logging.Level(defaultConfig.BaseLoggerDebugLevel)) wn := &WebsocketNetwork{ - log: log, - config: defaultConfig, - phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: genesisID, - NetworkID: config.Devtestnet, - peerStater: peerConnectionStater{log: log}, + log: log, + config: defaultConfig, + phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), + GenesisID: genesisID, + NetworkID: config.Devtestnet, + peerStater: peerConnectionStater{log: log}, + identityTracker: noopIdentityTracker{}, } wn.setup() wn.supportedProtocolVersions = []string{"2", "1"} @@ -4560,7 +4565,6 @@ func TestWsNetworkPhonebookMix(t *testing.T) { "test", "net", nil, - "", nil, ) require.NoError(t, err) diff --git a/node/follower_node.go b/node/follower_node.go index 117cc56e86..7d8fc64388 100644 --- a/node/follower_node.go +++ b/node/follower_node.go @@ -94,7 +94,7 @@ func MakeFollower(log logging.Logger, rootDir string, cfg config.Local, phoneboo node.config = cfg // tie network, block fetcher, and agreement services together - p2pNode, err := network.NewWebsocketNetwork(node.log, node.config, phonebookAddresses, genesis.ID(), genesis.Network, nil, "", nil) + p2pNode, err := network.NewWebsocketNetwork(node.log, node.config, phonebookAddresses, genesis.ID(), genesis.Network, nil, nil) if err != nil { log.Errorf("could not create websocket node: %v", err) return nil, err diff --git a/node/node.go b/node/node.go index 5f1baa56be..b6118aadc0 100644 --- a/node/node.go +++ b/node/node.go @@ -206,14 +206,14 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd return nil, err } } else if cfg.EnableP2P { - p2pNode, err = network.NewP2PNetwork(node.log, node.config, rootDir, phonebookAddresses, genesis.ID(), genesis.Network, node) + p2pNode, err = network.NewP2PNetwork(node.log, node.config, rootDir, phonebookAddresses, genesis.ID(), genesis.Network, node, nil) if err != nil { log.Errorf("could not create p2p node: %v", err) return nil, err } } else { var wsNode *network.WebsocketNetwork - wsNode, err = network.NewWebsocketNetwork(node.log, node.config, phonebookAddresses, genesis.ID(), genesis.Network, node, "", nil) + wsNode, err = network.NewWebsocketNetwork(node.log, node.config, phonebookAddresses, genesis.ID(), genesis.Network, node, nil) if err != nil { log.Errorf("could not create websocket node: %v", err) return nil, err diff --git a/node/node_test.go b/node/node_test.go index 3ea6d4a33d..e17e3e8d3f 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -875,6 +875,7 @@ func TestNodeHybridTopology(t *testing.T) { cfg.NetAddress = ni.wsNetAddr() cfg.EnableP2PHybridMode = true + cfg.PublicAddress = ni.wsNetAddr() cfg.EnableDHTProviders = true cfg.P2PPersistPeerID = true privKey, err := p2p.GetPrivKey(cfg, ni.rootDir) diff --git a/test/testdata/configs/config-v34.json b/test/testdata/configs/config-v34.json index 7f16155303..3a9714bbfb 100644 --- a/test/testdata/configs/config-v34.json +++ b/test/testdata/configs/config-v34.json @@ -98,6 +98,7 @@ "OptimizeAccountsDatabaseOnStartup": false, "OutgoingMessageFilterBucketCount": 3, "OutgoingMessageFilterBucketSize": 128, + "P2PIncomingConnectionsLimit": 1200, "P2PNetAddress": "", "P2PPersistPeerID": false, "P2PPrivateKeyLocation": "", diff --git a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/Makefile b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/Makefile index f4ec4b3c1f..7222fd3882 100644 --- a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/Makefile +++ b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/Makefile @@ -3,15 +3,17 @@ PARAMS=-w 20 -R 8 -N 20 -n 20 --npn-algod-nodes 10 --node-template node.json --r .PHONY: clean all +HYBRID ?= no + all: net.json genesis.json topology.json -node.json nonPartNode.json relay.json: - python3 copy-node-configs.py +node.json nonPartNode.json relay.json: copy-node-configs.py + python3 copy-node-configs.py --hybrid=${HYBRID} -net.json: node.json nonPartNode.json relay.json ${GOPATH}/bin/netgoal Makefile +net.json: node.json nonPartNode.json relay.json Makefile netgoal generate -t net -r /tmp/wat -o net.json ${PARAMS} -genesis.json: ${GOPATH}/bin/netgoal Makefile +genesis.json: Makefile netgoal generate -t genesis -r /tmp/wat -o genesis.l.json ${PARAMS} jq '.LastPartKeyRound=5000|.NetworkName="s1s-p2p"|.ConsensusProtocol="future"' < genesis.l.json > genesis.json rm genesis.l.json diff --git a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/README.md b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/README.md index 1cad95bc2d..04e8b986c7 100644 --- a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/README.md +++ b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/README.md @@ -7,10 +7,17 @@ This is a copy of scenario1s with the following changes in nodes configuration: ## Build ```sh -export GOPATH=~/go make ``` +If want to configure a hybrid net, set the `HYBRID` mode parameter to: + - `p2p` meaning all nodes are p2pnet and 50% of them are hybrid + - `ws` meaning all nodes are wsnet and 50% of them are hybrid + +```sh +make -D HYBRID=p2p +``` + ## Run Run as usual cluster test scenario with algonet. diff --git a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py index 6ffbc01d8d..12da86f348 100644 --- a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py +++ b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py @@ -5,14 +5,120 @@ 3. Set DNSSecurityFlags: 0 to all configs """ +import argparse +import copy import json import os CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) SCENARIO1S_DIR = os.path.join(CURRENT_DIR, "..", "scenario1s") +def make_p2p_net(*args): + """convert config to a pure p2p network""" + for config in args: + override_json = json.loads(config.get("ConfigJSONOverride", "{}")) + override_json["EnableP2P"] = True + override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC + config["ConfigJSONOverride"] = json.dumps(override_json) + + net_address = config.get("NetAddress") + if net_address: + config["P2PBootstrap"] = True + altconfigs = config.get("AltConfigs", []) + if altconfigs: + for i, altconfig in enumerate(altconfigs): + override_json = json.loads(altconfig.get("ConfigJSONOverride", "{}")) + override_json["EnableP2P"] = True + override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC + altconfigs[i]["ConfigJSONOverride"] = json.dumps(override_json) + config["AltConfigs"] = altconfigs + + +def make_hybrid_p2p_net(*args): + """convert config to a hybrid p2p network: + - half of relays become hybrid and receive public address + - half of non-relay nodes become hybrid + - AltConfigs are used for hybrid nodes with FractionApply=0.5 + - Only one AltConfigs is supported and its FractionApply is forced to 0.5 + """ + for config in args: + override_json = json.loads(config.get("ConfigJSONOverride", "{}")) + override_json["EnableP2P"] = True + override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC + config["ConfigJSONOverride"] = json.dumps(override_json) + + net_address = config.get("NetAddress") + if net_address: + # in p2p-only mode all relays are P2PBootstrap-able + config["P2PBootstrap"] = True + + altconfigs = config.get("AltConfigs") + altconfig = None + if altconfigs: + altconfig = altconfigs[0] + else: + altconfig = copy.deepcopy(config) + + override_json = json.loads(altconfig.get("ConfigJSONOverride", "{}")) + override_json["EnableP2PHybridMode"] = True + override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC + altconfig["ConfigJSONOverride"] = json.dumps(override_json) + if net_address: # relay, set public address + altconfig["P2PBootstrap"] = True + altconfig["P2PNetAddress"] = "{{NetworkPort2}}" + altconfig["PublicAddress"] = True + altconfig['FractionApply'] = 0.5 + + altconfigs = [altconfig] + config["AltConfigs"] = altconfigs + + +def make_hybrid_ws_net(*args): + """convert config to a hybrid ws network: + - half of relays become hybrid and receive public address + - half of non-relay nodes become hybrid + - AltConfigs are used for hybrid nodes with FractionApply=0.5 + - Only one AltConfigs is supported and its FractionApply is forced to 0.5 + """ + for config in args: + override_json = json.loads(config.get("ConfigJSONOverride", "{}")) + override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC + config["ConfigJSONOverride"] = json.dumps(override_json) + + net_address = config.get("NetAddress") + altconfigs = config.get("AltConfigs") + altconfig = None + if altconfigs: + altconfig = altconfigs[0] + else: + altconfig = copy.deepcopy(config) + + override_json = json.loads(altconfig.get("ConfigJSONOverride", "{}")) + override_json["EnableP2PHybridMode"] = True + override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC + altconfig["ConfigJSONOverride"] = json.dumps(override_json) + if net_address: # relay, set public address + altconfig["P2PBootstrap"] = True + altconfig["P2PNetAddress"] = "{{NetworkPort2}}" + altconfig["PublicAddress"] = True + altconfig['FractionApply'] = 0.5 + + altconfigs = [altconfig] + config["AltConfigs"] = altconfigs + + def main(): """main""" + ap = argparse.ArgumentParser() + ap.add_argument('--hybrid', type=str, help='Hybrid mode: p2p, ws') + args = ap.parse_args() + + hybrid_mode = args.hybrid + if hybrid_mode not in ("p2p", "ws"): + hybrid_mode = None + + print('Hybrid mode:', hybrid_mode) + with open(os.path.join(SCENARIO1S_DIR, "node.json"), "r") as f: node = json.load(f) with open(os.path.join(SCENARIO1S_DIR, "relay.json"), "r") as f: @@ -20,27 +126,15 @@ def main(): with open(os.path.join(SCENARIO1S_DIR, "nonPartNode.json"), "r") as f: non_part_node = json.load(f) - # make all relays P2PBootstrap'able - relay["P2PBootstrap"] = True - - # enable P2P for all configs - for config in (node, relay, non_part_node): - override = config.get("ConfigJSONOverride") - if override: - override_json = json.loads(override) - override_json["EnableP2P"] = True - override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC - config["ConfigJSONOverride"] = json.dumps(override_json) - altconfigs = config.get("AltConfigs", []) - if altconfigs: - for i, altconfig in enumerate(altconfigs): - override = altconfig.get("ConfigJSONOverride") - if override: - override_json = json.loads(override) - override_json["EnableP2P"] = True - override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC - altconfigs[i]["ConfigJSONOverride"] = json.dumps(override_json) - config["AltConfigs"] = altconfigs + if hybrid_mode == 'p2p': + print('making hybrid p2p network...') + make_hybrid_p2p_net(node, relay, non_part_node) + elif hybrid_mode == 'ws': + print('making hybrid ws network...') + make_hybrid_ws_net(node, relay, non_part_node) + else: + print('making pure p2p network...') + make_p2p_net(node, relay, non_part_node) with open("node.json", "w") as f: json.dump(node, f, indent=4)