Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autosharding content-topic config #696

Merged
merged 14 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions cmd/waku/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,22 @@ var (
})
Topics = altsrc.NewStringSliceFlag(&cli.StringSliceFlag{
Name: "topic",
Usage: "Pubsub topic to subscribe to. Argument may be repeated",
Usage: "Default topic to subscribe to. Argument may be repeated. Deprecated! Please use pubsub-topic and/or content-topic instead.",
Destination: &options.Relay.Topics,
EnvVars: []string{"WAKUNODE2_TOPICS"},
})
PubSubTopics = altsrc.NewStringSliceFlag(&cli.StringSliceFlag{
Name: "pubsub-topic",
Usage: "Default pubsub topic to subscribe to. Argument may be repeated.",
Destination: &options.Relay.PubSubTopics,
EnvVars: []string{"WAKUNODE2_PUBSUB_TOPICS"},
})
ContentTopics = altsrc.NewStringSliceFlag(&cli.StringSliceFlag{
Name: "content-topic",
Usage: "Default content topic to subscribe to. Argument may be repeated.",
Destination: &options.Relay.ContentTopics,
EnvVars: []string{"WAKUNODE2_CONTENT_TOPICS"},
})
ProtectedTopics = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{
Name: "protected-topic",
Usage: "Topics and its public key to be used for message validation, topic:pubkey. Argument may be repeated.",
Expand Down Expand Up @@ -301,14 +313,6 @@ var (
Value: true,
EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_MIGRATION"},
})
StoreResumePeer = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{
Name: "store-resume-peer",
Usage: "Peer multiaddress to resume the message store at boot. Option may be repeated",
Value: &cliutils.MultiaddrSlice{
Values: &options.Store.ResumeNodes,
},
EnvVars: []string{"WAKUNODE2_STORE_RESUME_PEER"},
})
FilterFlag = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "filter",
Usage: "Enable filter protocol",
Expand Down
3 changes: 2 additions & 1 deletion cmd/waku/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func main() {
AgentString,
Relay,
Topics,
ContentTopics,
PubSubTopics,
ProtectedTopics,
RelayPeerExchange,
MinRelayPeersToPublish,
Expand All @@ -59,7 +61,6 @@ func main() {
StoreMessageRetentionCapacity,
StoreMessageDBVacuum,
StoreMessageDBMigration,
StoreResumePeer,
FilterFlag,
FilterNode,
FilterTimeout,
Expand Down
98 changes: 66 additions & 32 deletions cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
dssql "github.com/ipfs/go-ds-sql"
"github.com/urfave/cli/v2"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

Expand All @@ -48,6 +47,7 @@ import (
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/node"
wprotocol "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
Expand Down Expand Up @@ -305,6 +305,9 @@ func Execute(options NodeOptions) {
addStaticPeers(wakuNode, options.Filter.NodesV1, legacy_filter.FilterID_v20beta1)
}

//Process pubSub and contentTopics specified and arrive at all corresponding pubSubTopics
pubSubTopicMap := processTopics(options)

if err = wakuNode.Start(ctx); err != nil {
logger.Fatal("starting waku node", zap.Error(err))
}
Expand All @@ -318,14 +321,10 @@ func Execute(options NodeOptions) {
addStaticPeers(wakuNode, options.Rendezvous.Nodes, rendezvous.RendezvousID)
addStaticPeers(wakuNode, options.Filter.Nodes, filter.FilterSubscribeID_v20beta1)

if len(options.Relay.Topics.Value()) == 0 {
options.Relay.Topics = *cli.NewStringSlice(relay.DefaultWakuTopic)
}

var wg sync.WaitGroup

if options.Relay.Enable {
for _, nodeTopic := range options.Relay.Topics.Value() {
for nodeTopic := range pubSubTopicMap {
nodeTopic := nodeTopic
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic)
failOnErr(err, "Error subscring to topic")
Expand Down Expand Up @@ -435,32 +434,6 @@ func Execute(options NodeOptions) {
}
}

if options.Store.Enable && len(options.Store.ResumeNodes) != 0 {
// TODO: extract this to a function and run it when you go offline
// TODO: determine if a store is listening to a topic

var peerIDs []peer.ID
for _, n := range options.Store.ResumeNodes {
pID, err := wakuNode.AddPeer(n, wakupeerstore.Static, store.StoreID_v20beta4)
if err != nil {
logger.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err))
}
peerIDs = append(peerIDs, pID)
}

for _, t := range options.Relay.Topics.Value() {
wg.Add(1)
go func(topic string) {
defer wg.Done()
ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second)
defer ctxCancel()
if _, err := wakuNode.Store().Resume(ctxWithTimeout, topic, peerIDs); err != nil {
logger.Error("Could not resume history", zap.Error(err))
}
}(t)
}
}

var rpcServer *rpc.WakuRpc
if options.RPCServer.Enable {
rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.PProf, options.RPCServer.RelayCacheCapacity, logger)
Expand Down Expand Up @@ -507,6 +480,36 @@ func Execute(options NodeOptions) {
}
}

func processTopics(options NodeOptions) map[string]struct{} {
//Using a map to avoid duplicate pub-sub topics that can result from autosharding
// or same-topic being passed twice.
pubSubTopicMap := make(map[string]struct{})

for _, topic := range options.Relay.Topics.Value() {
pubSubTopicMap[topic] = struct{}{}
}

for _, topic := range options.Relay.PubSubTopics.Value() {
pubSubTopicMap[topic] = struct{}{}
}

//Get pubSub topics from contentTopics if they are as per autosharding
for _, cTopic := range options.Relay.ContentTopics.Value() {
contentTopic, err := wprotocol.StringToContentTopic(cTopic)
if err != nil {
failOnErr(err, "failed to parse content topic")
}
pTopic := wprotocol.GetShardFromContentTopic(contentTopic, wprotocol.GenerationZeroShardsCount)
pubSubTopicMap[pTopic.String()] = struct{}{}
}
//If no topics are passed, then use default waku topic.
if len(pubSubTopicMap) == 0 {
pubSubTopicMap[relay.DefaultWakuTopic] = struct{}{}
}

return pubSubTopicMap
}

func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, protocols ...protocol.ID) {
for _, addr := range addresses {
_, err := wakuNode.AddPeer(addr, wakupeerstore.Static, protocols...)
Expand Down Expand Up @@ -534,6 +537,37 @@ func loadPrivateKeyFromFile(path string, passwd string) (*ecdsa.PrivateKey, erro
return crypto.ToECDSA(pKey)
}

/*
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
Commenting this until redesign of FT store
func processStoreResume(ctx context.Context, pubSubTopicMap map[string]struct{},
wakuNode *node.WakuNode, wg *sync.WaitGroup, logger *zap.Logger) {
if options.Store.Enable && len(options.Store.ResumeNodes) != 0 {
// TODO: extract this to a function and run it when you go offline
// TODO: determine if a store is listening to a topic

var peerIDs []peer.ID
for _, n := range options.Store.ResumeNodes {
pID, err := wakuNode.AddPeer(n, wakupeerstore.Static, store.StoreID_v20beta4)
if err != nil {
logger.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err))
}
peerIDs = append(peerIDs, pID)
}

for t := range pubSubTopicMap {
wg.Add(1)
go func(topic string) {
defer wg.Done()
ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second)
defer ctxCancel()
if _, err := wakuNode.Store().Resume(ctxWithTimeout, topic, peerIDs); err != nil {
logger.Error("Could not resume history", zap.Error(err))
}
}(t)
}
}
} */

func getPrivKey(options NodeOptions) (*ecdsa.PrivateKey, error) {
var prvKey *ecdsa.PrivateKey
// get private key from nodeKey or keyFile
Expand Down
10 changes: 6 additions & 4 deletions cmd/waku/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type RelayOptions struct {
Enable bool
Topics cli.StringSlice
ProtectedTopics []cliutils.ProtectedTopic
PubSubTopics cli.StringSlice
ContentTopics cli.StringSlice
PeerExchange bool
MinRelayPeersToPublish int
}
Expand Down Expand Up @@ -77,10 +79,10 @@ type StoreOptions struct {
DatabaseURL string
RetentionTime time.Duration
RetentionMaxMessages int
ResumeNodes []multiaddr.Multiaddr
Nodes []multiaddr.Multiaddr
Vacuum bool
Migration bool
//ResumeNodes []multiaddr.Multiaddr
Nodes []multiaddr.Multiaddr
Vacuum bool
Migration bool
}

// DNSDiscoveryOptions are settings used for enabling DNS-based discovery
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/pubsub_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type StaticShardingPubsubTopic struct {
}

// NewStaticShardingPubsubTopic creates a new pubSub topic
func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) NamespacedPubsubTopic {
func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) StaticShardingPubsubTopic {
return StaticShardingPubsubTopic{
kind: StaticSharding,
cluster: cluster,
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func FromBitVector(buf []byte) (RelayShards, error) {

// GetShardFromContentTopic runs Autosharding logic and returns a pubSubTopic
// This is based on Autosharding algorithm defined in RFC 51
func GetShardFromContentTopic(topic ContentTopic, shardCount int) NamespacedPubsubTopic {
func GetShardFromContentTopic(topic ContentTopic, shardCount int) StaticShardingPubsubTopic {
bytes := []byte(topic.ApplicationName)
bytes = append(bytes, []byte(fmt.Sprintf("%d", topic.ApplicationVersion))...)

Expand Down
61 changes: 61 additions & 0 deletions waku/v2/protocol/topic_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package protocol

import (
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -53,6 +55,65 @@ func TestContentTopicAndSharding(t *testing.T) {
require.Equal(t, ct5.Generation, 0)
}

func randomContentTopic() (ContentTopic, error) {
var app = ""
const WordLength = 5
rand.New(rand.NewSource(time.Now().Unix()))

//Generate a random character between lowercase a to z
for i := 0; i < WordLength; i++ {
randomChar := 'a' + rune(rand.Intn(26))
app = app + string(randomChar)
}
version := uint32(1)

var name = ""

for i := 0; i < WordLength; i++ {
randomChar := 'a' + rune(rand.Intn(26))
name = name + string(randomChar)
}
var enc = "proto"

return NewContentTopic(app, version, name, enc)
}

func TestShardChoiceSimulation(t *testing.T) {
//Given
var topics []ContentTopic
for i := 0; i < 100000; i++ {
ct, err := randomContentTopic()
require.NoError(t, err)
topics = append(topics, ct)
}

var counts [GenerationZeroShardsCount]int

// When
for _, topic := range topics {
pubsub := GetShardFromContentTopic(topic, GenerationZeroShardsCount)
counts[pubsub.Shard()]++
}

t.Logf("Total number of topics simulated %d", len(topics))
for i := 0; i < GenerationZeroShardsCount; i++ {
t.Logf("Topics assigned to shard %d is %d", i, counts[i])
}

// Then
for i := 1; i < GenerationZeroShardsCount; i++ {
//t.Logf("float64(counts[%d]) %f float64(counts[%d]) %f", i-1, float64(counts[i-1]), i, float64(counts[i]))
if float64(counts[i-1]) <= (float64(counts[i])*1.05) &&
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
float64(counts[i]) <= (float64(counts[i-1])*1.05) &&
float64(counts[i-1]) >= (float64(counts[i])*0.95) &&
float64(counts[i]) >= (float64(counts[i-1])*0.95) {
t.Logf("Shard choice simulation successful")
} else {
t.FailNow()
}
}
}

func TestNsPubsubTopic(t *testing.T) {
ns1 := NewNamedShardingPubsubTopic("waku-dev")
require.Equal(t, "/waku/2/waku-dev", ns1.String())
Expand Down