Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: hybrid node net identity for connection deduplication #6035

Merged
merged 26 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3221ba3
Implement shared identity tracker
algorandskiy Jun 13, 2024
d2de05c
CR: fix identityOpts reuse concern
algorandskiy Jun 21, 2024
a1a33b8
gossipsub: fix pubsub.processLoop and test termination race
algorandskiy Jun 24, 2024
808ba67
netdeploy: support hybrid p2p scenarios
algorandskiy Jun 28, 2024
0c6c43e
more logging into TestHybridNetwork_DuplicateConn
algorandskiy Jul 1, 2024
4350de3
CR fixes
algorandskiy Jul 1, 2024
798722e
fix hybrid scenario generation
algorandskiy Jul 1, 2024
2fbe979
fixes to hybrid test scripts
algorandskiy Jul 1, 2024
d12f93f
do not assign NetAddress to SRV in p2p mode
algorandskiy Jul 2, 2024
9b281bd
fix p2p netaddress and publicaddress
algorandskiy Jul 2, 2024
8532f1f
remove gossibSubPeer type
algorandskiy Jul 2, 2024
3584830
fix letter case for public address in net identity
algorandskiy Jul 3, 2024
c11a329
Apply suggestions from code review
algorandskiy Jul 8, 2024
0b64ff3
Merge branch 'master' into pavel/p2p-hybrid-tests
algorandskiy Jul 8, 2024
5767554
fix build after applying GH suggestions
algorandskiy Jul 9, 2024
ef77424
CR: hybrid cluster script update
algorandskiy Jul 9, 2024
219f4de
Merge remote-tracking branch 'upstream/master' into pavel/p2p-hybrid-…
algorandskiy Jul 10, 2024
2d04b94
add a stub for connection limiting in p2p net
algorandskiy Jul 10, 2024
cea2d4d
Merge remote-tracking branch 'upstream/master' into pavel/p2p-hybrid-…
algorandskiy Jul 12, 2024
16a096c
IsGossipServer takes into account p2p and hybrid options
algorandskiy Jul 12, 2024
dfdd101
config: Add P2PIncomingConnectionsLimit
algorandskiy Jul 12, 2024
1425b70
CR fixes: fd calc and deploy net
algorandskiy Jul 17, 2024
f002752
CR fixes: IsGossipServer changes
algorandskiy Jul 17, 2024
c6ca8b1
Merge remote-tracking branch 'upstream/master' into pavel/p2p-hybrid-…
algorandskiy Jul 18, 2024
d0b973a
post merge fixes
algorandskiy Jul 18, 2024
c3cd19c
Update config/localTemplate.go
algorandskiy Jul 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"os"
"os/user"
"path/filepath"
"strings"

"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util/codecs"
Expand Down Expand Up @@ -144,7 +145,17 @@
defer f.Close()

err = loadConfig(f, &source)
if err != nil {
return source, err

Check warning on line 149 in config/config.go

View check run for this annotation

Codecov / codecov/patch

config/config.go#L149

Added line #L149 was not covered by tests
}
source, err = enrichNetworkingConfig(source)
return source, err
}

// enrichNetworkingConfig makes the following tweaks to the config:
// - If NetAddress is set, enable the ledger and block services
// - If EnableP2PHybridMode is set, require PublicAddress to be set
func enrichNetworkingConfig(source Local) (Local, error) {
if source.NetAddress != "" {
source.EnableLedgerService = true
source.EnableBlockService = true
Expand All @@ -155,8 +166,13 @@
source.GossipFanout = defaultRelayGossipFanout
}
}

return source, err
// In hybrid mode we want to prevent connections from the same node over both P2P and WS.
// The only way it is supported at the moment is to use net identity challenge that is based on PublicAddress.
if (source.NetAddress != "" || source.P2PNetAddress != "") && source.EnableP2PHybridMode && source.PublicAddress == "" {
return source, errors.New("PublicAddress must be specified when EnableP2PHybridMode is set")
}
source.PublicAddress = strings.ToLower(source.PublicAddress)
return source, nil
}

func loadConfig(reader io.Reader, config *Local) error {
Expand Down
185 changes: 157 additions & 28 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,62 @@ func TestLocal_MergeConfig(t *testing.T) {
require.Equal(t, c1.GossipFanout, c2.GossipFanout)
}

func TestLocal_EnrichNetworkingConfig(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

c1 := Local{
NetAddress: "test1",
GossipFanout: defaultLocal.GossipFanout,
}
c2, err := enrichNetworkingConfig(c1)
require.NoError(t, err)
require.NotEqual(t, c1, c2)
require.False(t, c1.EnableLedgerService)
require.False(t, c1.EnableBlockService)
require.Equal(t, c1.GossipFanout, defaultLocal.GossipFanout)
require.True(t, c2.EnableLedgerService)
require.True(t, c2.EnableBlockService)
require.Equal(t, c2.GossipFanout, defaultRelayGossipFanout)

c1 = Local{
EnableP2PHybridMode: true,
}
c2, err = enrichNetworkingConfig(c1)
require.NoError(t, err)

c1 = Local{
NetAddress: "test1",
EnableP2PHybridMode: true,
}
c2, err = enrichNetworkingConfig(c1)
require.ErrorContains(t, err, "PublicAddress must be specified when EnableP2PHybridMode is set")

c1 = Local{
P2PNetAddress: "test1",
EnableP2PHybridMode: true,
}
c2, err = enrichNetworkingConfig(c1)
require.ErrorContains(t, err, "PublicAddress must be specified when EnableP2PHybridMode is set")

c1 = Local{
EnableP2PHybridMode: true,
PublicAddress: "test2",
}
c2, err = enrichNetworkingConfig(c1)
require.NoError(t, err)
require.Equal(t, c1, c2)
require.True(t, c2.EnableP2PHybridMode)
require.NotEmpty(t, c2.PublicAddress)

c1 = Local{
PublicAddress: "R1.test3.my-domain.tld",
}
c2, err = enrichNetworkingConfig(c1)
require.NoError(t, err)
require.Equal(t, "r1.test3.my-domain.tld", c2.PublicAddress)
}

func saveFullPhonebook(phonebook phonebookBlackWhiteList, saveToDir string) error {
filename := filepath.Join(saveToDir, PhonebookFilename)
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
Expand Down Expand Up @@ -559,55 +615,128 @@ func TestLocal_IsGossipServer(t *testing.T) {

cfg := GetDefaultLocal()
require.False(t, cfg.IsGossipServer())
require.False(t, cfg.IsWsGossipServer())
require.False(t, cfg.IsP2PGossipServer())

cfg.NetAddress = ":4160"
require.True(t, cfg.IsGossipServer())
require.True(t, cfg.IsWsGossipServer())
require.False(t, cfg.IsP2PGossipServer())

cfg.EnableGossipService = false
// EnableGossipService does not matter
require.True(t, cfg.IsGossipServer())
require.True(t, cfg.IsWsGossipServer())
require.False(t, cfg.IsP2PGossipServer())

cfg.EnableP2P = true
cfg.NetAddress = ":4160"
require.True(t, cfg.IsGossipServer())
require.False(t, cfg.IsWsGossipServer())
require.True(t, cfg.IsP2PGossipServer())

cfg.EnableP2P = false

cfg.EnableP2PHybridMode = true
// with net address set it is ws net gossip server
require.True(t, cfg.IsGossipServer())
require.True(t, cfg.IsWsGossipServer())
require.False(t, cfg.IsP2PGossipServer())

cfg.EnableP2PHybridMode = true
cfg.NetAddress = ""
require.False(t, cfg.IsGossipServer())
require.False(t, cfg.IsWsGossipServer())
require.False(t, cfg.IsP2PGossipServer())

cfg.EnableP2PHybridMode = true
cfg.P2PNetAddress = ":4190"
require.True(t, cfg.IsGossipServer())
require.False(t, cfg.IsWsGossipServer())
require.True(t, cfg.IsP2PGossipServer())

cfg.EnableP2PHybridMode = true
cfg.NetAddress = ":4160"
cfg.P2PNetAddress = ":4190"
require.True(t, cfg.IsGossipServer())
require.True(t, cfg.IsWsGossipServer())
require.True(t, cfg.IsP2PGossipServer())

cfg.EnableP2PHybridMode = true
cfg.EnableP2P = true
cfg.NetAddress = ":4160"
cfg.P2PNetAddress = ":4190"
require.True(t, cfg.IsGossipServer())
require.True(t, cfg.IsWsGossipServer())
require.True(t, cfg.IsP2PGossipServer())

cfg.EnableP2PHybridMode = true
cfg.EnableP2P = true
cfg.NetAddress = ":4160"
cfg.P2PNetAddress = ""
require.True(t, cfg.IsGossipServer())
require.True(t, cfg.IsWsGossipServer())
require.False(t, cfg.IsP2PGossipServer())
}

func TestLocal_RecalculateConnectionLimits(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

var tests = []struct {
maxFDs uint64
reservedIn uint64
restSoftIn uint64
restHardIn uint64
incomingIn int

updated bool
restSoftExp uint64
restHardExp uint64
incomingExp int
maxFDs uint64
reservedIn uint64
restSoftIn uint64
restHardIn uint64
incomingIn int
p2pIncomingIn int

updated bool
restSoftExp uint64
restHardExp uint64
incomingExp int
p2pIncomingExp int
}{
{100, 10, 20, 40, 50, false, 20, 40, 50}, // no change
{100, 10, 20, 50, 50, true, 20, 40, 50}, // borrow from rest
{100, 10, 25, 50, 50, true, 25, 40, 50}, // borrow from rest
{100, 10, 50, 50, 50, true, 40, 40, 50}, // borrow from rest, update soft
{100, 10, 9, 19, 81, true, 9, 10, 80}, // borrow from both rest and incoming
{100, 10, 10, 20, 80, true, 10, 10, 80}, // borrow from both rest and incoming
{100, 50, 10, 30, 40, true, 10, 10, 40}, // borrow from both rest and incoming
{100, 90, 10, 30, 40, true, 10, 10, 0}, // borrow from both rest and incoming, clear incoming
{4096, 256, 1024, 2048, 2400, true, 1024, 1440, 2400}, // real numbers
{5000, 256, 1024, 2048, 2400, false, 1024, 2048, 2400}, // real numbers
{100, 10, 20, 40, 50, 0, false, 20, 40, 50, 0}, // no change
{100, 10, 20, 50, 50, 0, true, 20, 40, 50, 0}, // borrow from rest
{100, 10, 25, 50, 50, 0, true, 25, 40, 50, 0}, // borrow from rest
{100, 10, 25, 50, 50, 50, true, 10, 10, 40, 40}, // borrow from rest for incoming and p2p incoming
{100, 10, 50, 50, 50, 0, true, 40, 40, 50, 0}, // borrow from rest, update soft
{100, 10, 50, 50, 40, 10, true, 40, 40, 40, 10}, // borrow from rest, update soft for incoming and p2p incoming
{100, 10, 9, 19, 81, 0, true, 9, 10, 80, 0}, // borrow from both rest and incoming
{100, 10, 9, 19, 41, 41, true, 9, 10, 40, 40}, // borrow from both rest and incoming for incoming and p2p incoming
{100, 90, 10, 30, 40, 0, true, 10, 10, 0, 0}, // borrow from both rest and incoming, clear incoming
{100, 90, 10, 30, 40, 40, true, 10, 10, 0, 0}, // borrow from both rest and incoming, clear incoming
{100, 90, 10, 30, 50, 40, true, 10, 10, 0, 0}, // borrow from both rest and incoming, clear incoming
{4096, 256, 1024, 2048, 2400, 0, true, 1024, 1440, 2400, 0}, // real numbers
{5000, 256, 1024, 2048, 2400, 0, false, 1024, 2048, 2400, 0}, // real numbers
{4096, 256, 1024, 2048, 2400, 1200, true, 240, 240, 2400, 1200}, // real numbers
{6000, 256, 1024, 2048, 2400, 1200, false, 1024, 2048, 2400, 1200}, // real numbers
}

for i, test := range tests {
test := test
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
t.Run(fmt.Sprintf("test=%d", i), func(t *testing.T) {
t.Parallel()

c := Local{
RestConnectionsSoftLimit: test.restSoftIn,
RestConnectionsHardLimit: test.restHardIn,
IncomingConnectionsLimit: test.incomingIn,
NetAddress: ":4160",
RestConnectionsSoftLimit: test.restSoftIn,
RestConnectionsHardLimit: test.restHardIn,
IncomingConnectionsLimit: test.incomingIn,
P2PIncomingConnectionsLimit: test.p2pIncomingIn,
}
if test.p2pIncomingIn > 0 {
c.EnableP2PHybridMode = true
c.P2PNetAddress = ":4190"
}
requireFDs := test.reservedIn + test.restHardIn + uint64(test.incomingIn)
requireFDs := test.reservedIn + test.restHardIn + uint64(test.incomingIn) + uint64(test.p2pIncomingIn)
res := c.AdjustConnectionLimits(requireFDs, test.maxFDs)
require.Equal(t, test.updated, res)
require.Equal(t, test.restSoftExp, c.RestConnectionsSoftLimit)
require.Equal(t, test.restHardExp, c.RestConnectionsHardLimit)
require.Equal(t, test.incomingExp, c.IncomingConnectionsLimit)
require.Equal(t, int(test.restSoftExp), int(c.RestConnectionsSoftLimit))
require.Equal(t, int(test.restHardExp), int(c.RestConnectionsHardLimit))
require.Equal(t, int(test.incomingExp), int(c.IncomingConnectionsLimit))
require.Equal(t, int(test.p2pIncomingExp), int(c.P2PIncomingConnectionsLimit))
})
}
}
Expand Down
42 changes: 35 additions & 7 deletions config/localTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ type Local struct {
// Estimating 1.5MB per incoming connection, 1.5MB*2400 = 3.6GB
IncomingConnectionsLimit int `version[0]:"-1" version[1]:"10000" version[17]:"800" version[27]:"2400"`

P2PIncomingConnectionsLimit int `version[34]:"1200"`

// BroadcastConnectionsLimit specifies the number of connections that
// will receive broadcast (gossip) messages from this node. If the
// node has more connections than this number, it will send broadcasts
Expand Down Expand Up @@ -602,6 +604,7 @@ type Local struct {
EnableP2P bool `version[31]:"false"`

// EnableP2PHybridMode turns on both websockets and P2P networking.
// Enabling this setting also requires PublicAddress to be set.
EnableP2PHybridMode bool `version[34]:"false"`

// P2PNetAddress sets the listen address used for P2P networking, if hybrid mode is set.
Expand Down Expand Up @@ -734,10 +737,21 @@ func (cfg Local) TxFilterCanonicalEnabled() bool {
return cfg.TxIncomingFilteringFlags&txFilterCanonical != 0
}

// IsGossipServer returns true if NetAddress is set and this node supposed
// to start websocket server
// IsGossipServer returns true if this node supposed to start websocket or p2p server
func (cfg Local) IsGossipServer() bool {
return cfg.NetAddress != ""
return cfg.IsWsGossipServer() || cfg.IsP2PGossipServer()
}

// IsWsGossipServer returns true if a node configured to run a listening ws net
func (cfg Local) IsWsGossipServer() bool {
// 1. NetAddress is set and EnableP2P is not set
// 2. NetAddress is set and EnableP2PHybridMode is set then EnableP2P is overridden by EnableP2PHybridMode
return cfg.NetAddress != "" && (!cfg.EnableP2P || cfg.EnableP2PHybridMode)
}

// IsP2PGossipServer returns true if a node configured to run a listening p2p net
func (cfg Local) IsP2PGossipServer() bool {
return cfg.EnableP2P && !cfg.EnableP2PHybridMode && cfg.NetAddress != "" || cfg.EnableP2PHybridMode && cfg.P2PNetAddress != ""
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
}

// ensureAbsGenesisDir will convert a path to absolute, and will attempt to make a genesis directory there
Expand Down Expand Up @@ -935,10 +949,24 @@ func (cfg *Local) AdjustConnectionLimits(requiredFDs, maxFDs uint64) bool {
if cfg.RestConnectionsHardLimit <= diff+reservedRESTConns {
restDelta := diff + reservedRESTConns - cfg.RestConnectionsHardLimit
cfg.RestConnectionsHardLimit = reservedRESTConns
if cfg.IncomingConnectionsLimit > int(restDelta) {
cfg.IncomingConnectionsLimit -= int(restDelta)
} else {
cfg.IncomingConnectionsLimit = 0
splitRatio := 1
if cfg.IsWsGossipServer() && cfg.IsP2PGossipServer() {
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
// split the rest of the delta between ws and p2p evenly
splitRatio = 2
}
if cfg.IsWsGossipServer() {
if cfg.IncomingConnectionsLimit > int(restDelta) {
cfg.IncomingConnectionsLimit -= int(restDelta) / splitRatio
} else {
cfg.IncomingConnectionsLimit = 0
}
}
if cfg.IsP2PGossipServer() {
if cfg.P2PIncomingConnectionsLimit > int(restDelta) {
cfg.P2PIncomingConnectionsLimit -= int(restDelta) / splitRatio
} else {
cfg.P2PIncomingConnectionsLimit = 0
}
}
} else {
cfg.RestConnectionsHardLimit -= diff
Expand Down
1 change: 1 addition & 0 deletions config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ var defaultLocal = Local{
OptimizeAccountsDatabaseOnStartup: false,
OutgoingMessageFilterBucketCount: 3,
OutgoingMessageFilterBucketSize: 128,
P2PIncomingConnectionsLimit: 1200,
P2PNetAddress: "",
P2PPersistPeerID: false,
P2PPrivateKeyLocation: "",
Expand Down
24 changes: 20 additions & 4 deletions daemon/algod/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,21 @@

if cfg.IsGossipServer() {
var ot basics.OverflowTracker
fdRequired = ot.Add(fdRequired, uint64(cfg.IncomingConnectionsLimit)+network.ReservedHealthServiceConnections)
fdRequired = ot.Add(fdRequired, network.ReservedHealthServiceConnections)

Check warning on line 152 in daemon/algod/server.go

View check run for this annotation

Codecov / codecov/patch

daemon/algod/server.go#L152

Added line #L152 was not covered by tests
if ot.Overflowed {
return errors.New("Initialize() overflowed when adding up IncomingConnectionsLimit to the existing RLIMIT_NOFILE value; decrease RestConnectionsHardLimit or IncomingConnectionsLimit")
return errors.New("Initialize() overflowed when adding up ReservedHealthServiceConnections to the existing RLIMIT_NOFILE value; decrease RestConnectionsHardLimit")

Check warning on line 154 in daemon/algod/server.go

View check run for this annotation

Codecov / codecov/patch

daemon/algod/server.go#L154

Added line #L154 was not covered by tests
}
if cfg.IsWsGossipServer() {
fdRequired = ot.Add(fdRequired, uint64(cfg.IncomingConnectionsLimit))
if ot.Overflowed {
return errors.New("Initialize() overflowed when adding up IncomingConnectionsLimit to the existing RLIMIT_NOFILE value; decrease IncomingConnectionsLimit")

Check warning on line 159 in daemon/algod/server.go

View check run for this annotation

Codecov / codecov/patch

daemon/algod/server.go#L156-L159

Added lines #L156 - L159 were not covered by tests
}
}
if cfg.IsP2PGossipServer() {
fdRequired = ot.Add(fdRequired, uint64(cfg.P2PIncomingConnectionsLimit))
if ot.Overflowed {
return errors.New("Initialize() overflowed when adding up P2PIncomingConnectionsLimit to the existing RLIMIT_NOFILE value; decrease P2PIncomingConnectionsLimit")

Check warning on line 165 in daemon/algod/server.go

View check run for this annotation

Codecov / codecov/patch

daemon/algod/server.go#L162-L165

Added lines #L162 - L165 were not covered by tests
}
}
_, hard, fdErr := util.GetFdLimits()
if fdErr != nil {
Expand All @@ -164,14 +176,18 @@
// but try to keep cfg.ReservedFDs untouched by decreasing other limits
if cfg.AdjustConnectionLimits(fdRequired, hard) {
s.log.Warnf(
"Updated connection limits: RestConnectionsSoftLimit=%d, RestConnectionsHardLimit=%d, IncomingConnectionsLimit=%d",
"Updated connection limits: RestConnectionsSoftLimit=%d, RestConnectionsHardLimit=%d, IncomingConnectionsLimit=%d, P2PIncomingConnectionsLimit=%d",

Check warning on line 179 in daemon/algod/server.go

View check run for this annotation

Codecov / codecov/patch

daemon/algod/server.go#L179

Added line #L179 was not covered by tests
cfg.RestConnectionsSoftLimit,
cfg.RestConnectionsHardLimit,
cfg.IncomingConnectionsLimit,
cfg.P2PIncomingConnectionsLimit,

Check warning on line 183 in daemon/algod/server.go

View check run for this annotation

Codecov / codecov/patch

daemon/algod/server.go#L183

Added line #L183 was not covered by tests
)
if cfg.IncomingConnectionsLimit == 0 {
if cfg.IsWsGossipServer() && cfg.IncomingConnectionsLimit == 0 {

Check warning on line 185 in daemon/algod/server.go

View check run for this annotation

Codecov / codecov/patch

daemon/algod/server.go#L185

Added line #L185 was not covered by tests
return errors.New("Initialize() failed to adjust connection limits")
}
if cfg.IsP2PGossipServer() && cfg.P2PIncomingConnectionsLimit == 0 {
return errors.New("Initialize() failed to adjust p2p connection limits")

Check warning on line 189 in daemon/algod/server.go

View check run for this annotation

Codecov / codecov/patch

daemon/algod/server.go#L188-L189

Added lines #L188 - L189 were not covered by tests
}
}
}
fdErr = util.SetFdSoftLimit(maxFDs)
Expand Down
1 change: 1 addition & 0 deletions installer/config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
"OptimizeAccountsDatabaseOnStartup": false,
"OutgoingMessageFilterBucketCount": 3,
"OutgoingMessageFilterBucketSize": 128,
"P2PIncomingConnectionsLimit": 1200,
"P2PNetAddress": "",
"P2PPersistPeerID": false,
"P2PPrivateKeyLocation": "",
Expand Down
Loading
Loading