Skip to content

Commit

Permalink
fix gossip protocol and discovery flow
Browse files Browse the repository at this point in the history
  • Loading branch information
pentateu committed Apr 15, 2024
1 parent 62e0322 commit 11ea3ee
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 114 deletions.
1 change: 1 addition & 0 deletions moleculer.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ type Middleware interface {
}
type Node interface {
GetID() string
GetHost() string
ExportAsMap() map[string]interface{}
IsAvailable() bool
GetIpList() []string
Expand Down
10 changes: 10 additions & 0 deletions registry/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,16 @@ func interfaceToString(list []interface{}) []string {
return result
}

func (node *Node) GetHost() string {
if node.GetUdpAddress() != "" {
return node.GetUdpAddress()
}
if len(node.GetIpList()) > 0 {
return node.GetIpList()[0]
}
return ""
}

// ExportAsMap export the node info as a map
// this map is used to publish the node info to other nodes.
func (node *Node) ExportAsMap() map[string]interface{} {
Expand Down
1 change: 1 addition & 0 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (registry *ServiceRegistry) AddOfflineNode(nodeID string, hostname, ipAddre
"ipList": []string{ipAddress},
})
registry.nodes.Add(node)
// registry.GetLocalNode().IncreaseSequence()
return node
}

Expand Down
15 changes: 10 additions & 5 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ func (service *Service) AsMap() map[string]interface{} {
serviceInfo["settings"] = service.settings
serviceInfo["metadata"] = service.metadata
serviceInfo["nodeID"] = service.nodeID
serviceInfo["fullName"] = service.fullname

if service.nodeID == "" {
panic("no service.nodeID")
Expand Down Expand Up @@ -458,7 +459,7 @@ func (service *Service) AddEventMap(eventInfo map[string]interface{}) *Event {
return &serviceEvent
}

//UpdateFromMap update the service metadata and settings from a serviceInfo map
// UpdateFromMap update the service metadata and settings from a serviceInfo map
func (service *Service) UpdateFromMap(serviceInfo map[string]interface{}) {
service.settings = serviceInfo["settings"].(map[string]interface{})
service.metadata = serviceInfo["metadata"].(map[string]interface{})
Expand All @@ -482,9 +483,13 @@ func populateFromMap(service *Service, serviceInfo map[string]interface{}) {
}
service.version = ParseVersion(serviceInfo["version"])
service.name = serviceInfo["name"].(string)
service.fullname = JoinVersionToName(
service.name,
service.version)
if fullName, ok := serviceInfo["fullName"]; ok {
service.fullname = fullName.(string)
} else {
service.fullname = JoinVersionToName(
service.name,
service.version)
}

service.settings = serviceInfo["settings"].(map[string]interface{})
service.metadata = serviceInfo["metadata"].(map[string]interface{})
Expand Down Expand Up @@ -822,7 +827,7 @@ func getName(obj interface{}) (string, error) {
}

// objToSchema create a service schema based on a object.
//checks if
// checks if
func objToSchema(obj interface{}) (moleculer.ServiceSchema, error) {
schema := moleculer.ServiceSchema{}
name, err := getName(obj)
Expand Down
170 changes: 96 additions & 74 deletions transit/tcp/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,71 @@ func (transporter *TCPTransporter) startGossipTimer() {
transporter.gossipTimer = time.NewTicker(time.Second * time.Duration(transporter.options.GossipPeriod))
go func() {
for range transporter.gossipTimer.C {
transporter.sendGossipRequest(false)
transporter.sendGossipRequest("")
}
}()
}

func (transporter *TCPTransporter) sendGossipRequest(broadcast bool) {
func (transporter *TCPTransporter) getRandomNode(nodes []moleculer.Node) moleculer.Node {
if len(nodes) == 0 {
return nil
}
nonLocalNodes := []moleculer.Node{}
for _, node := range nodes {
if !node.IsLocal() {
nonLocalNodes = append(nonLocalNodes, node)
}
}
if len(nonLocalNodes) == 0 {
return nil
}
return nonLocalNodes[rand.Intn(len(nonLocalNodes))]
}

transporter.logger.Trace("Sending gossip request")
func (transporter *TCPTransporter) sendGossip(nodeID string, payload moleculer.Payload) {
node := transporter.registry.GetNodeByID(nodeID)
if !node.IsLocal() {
transporter.logger.Trace("Sending gossip request to "+node.GetID(), "payload:", util.PrettyPrintMap(payload.RawMap()))
transporter.Publish(msgTypeToCommand(PACKET_GOSSIP_REQ), node.GetID(), payload)
}
}

func (transporter *TCPTransporter) sendGossipHello(nodeID string) {
localNode := transporter.registry.GetLocalNode()
payload := payloadPkg.Empty()
payload.Add("sender", localNode.GetID())
payload.Add("host", localNode.GetHost())
payload.Add("port", localNode.GetPort())

transporter.Publish(msgTypeToCommand(PACKET_GOSSIP_HELLO), nodeID, payload)
}

func (transporter *TCPTransporter) onGossipHello(fromAddrss string, payload moleculer.Payload) {
sender := payload.Get("sender").String()
port := payload.Get("port").Int()
hostname := payload.Get("host").String()

transporter.logger.Debug("Received gossip hello from sender: ", sender, "ipAddress: ", fromAddrss, " hostname: ", hostname)

node := transporter.registry.GetNodeByID(sender)
if node == nil {
transporter.logger.Debug("Unknown node. Register as offline node - sender: ", sender)
node = transporter.registry.AddOfflineNode(sender, hostname, fromAddrss, port)
}
if node.GetUdpAddress() == "" {
node.UpdateInfo(map[string]interface{}{
"udpAddress": fromAddrss,
})
}
node.Available()
transporter.logger.Trace("will send a gossip response to node: " + sender)
transporter.onGossipRequest(payloadPkg.Empty().Add("sender", sender))
}

func (transporter *TCPTransporter) sendGossipRequest(target string) {
transporter.logger.Trace("Sending gossip request")
node := transporter.registry.GetLocalNode()
node.UpdateMetrics()

onlineResponse := map[string]interface{}{}
offlineResponse := map[string]interface{}{}
onlineNodes := []moleculer.Node{}
Expand All @@ -51,77 +104,47 @@ func (transporter *TCPTransporter) sendGossipRequest(broadcast bool) {
}

if len(onlineResponse) > 0 {
if broadcast {
transporter.broadcastGossipToNodes(payload, onlineNodes)
var targetNode moleculer.Node
if target != "" {
transporter.logger.Debug("target node is specified target:", target)
targetNode = transporter.registry.GetNodeByID(target)
}
if targetNode == nil {
targetNode = transporter.getRandomNode(onlineNodes)
if targetNode != nil {
transporter.logger.Trace("selected a random node to send gossip request - nodeId:", targetNode.GetID())
} else {
transporter.logger.Trace("could not select a random node - online nodes size:", len(onlineNodes))
}
}
if targetNode != nil {
transporter.sendGossip(targetNode.GetID(), payload)
} else {
transporter.sendGossipToRandomEndpoint(payload, onlineNodes)
transporter.logger.Debug("No target node found for gossip request - target param:", target, " online nodes size:", len(onlineNodes))
}
}

if len(offlineNodes) > 0 {
ratio := float64(len(offlineNodes)) / float64(len(onlineNodes)+1)
if ratio >= 1 || rand.Float64() < ratio {
transporter.sendGossipToRandomEndpoint(payload, offlineNodes)
randomNode := transporter.getRandomNode(offlineNodes)
transporter.sendGossip(randomNode.GetID(), payload)
}
}
}

func (transporter *TCPTransporter) broadcastGossipToNodes(payload moleculer.Payload, nodes []moleculer.Node) {
if len(nodes) == 0 {
return
}
for _, node := range nodes {
if !node.IsLocal() {
transporter.logger.Trace("Sending gossip request to "+node.GetID(), "payload:", payload)
transporter.Publish(msgTypeToCommand(PACKET_GOSSIP_REQ), node.GetID(), payload)
}
}
}

func (transporter *TCPTransporter) sendGossipToRandomEndpoint(payload moleculer.Payload, nodes []moleculer.Node) {
if len(nodes) == 0 {
return
}
node := nodes[rand.Intn(len(nodes))]
if !node.IsLocal() {
transporter.logger.Trace("Sending gossip request to "+node.GetID(), "payload:", payload)
transporter.Publish(msgTypeToCommand(PACKET_GOSSIP_REQ), node.GetID(), payload)
}
}

func (transporter *TCPTransporter) onGossipHello(fromAddrss string, msgBytes *[]byte) {
payload := transporter.serializer.BytesToPayload(msgBytes)
sender := payload.Get("sender").String()
port := payload.Get("port").Int()
hostname := payload.Get("host").String()

transporter.logger.Debug("Received gossip hello from sender: ", sender, "ipAddress: ", fromAddrss, " hostname: ", hostname)

node := transporter.registry.GetNodeByID(sender)
if node == nil {
transporter.logger.Debug("Unknown node. Register as offline node - sender: ", sender)
node = transporter.registry.AddOfflineNode(sender, hostname, fromAddrss, port)
}
if node.GetUdpAddress() == "" {
node.UpdateInfo(map[string]interface{}{
"udpAddress": fromAddrss,
})
}
}

func (transporter *TCPTransporter) onGossipRequest(msgBytes *[]byte) {
payload := transporter.serializer.BytesToPayload(msgBytes)
func (transporter *TCPTransporter) onGossipRequest(payload moleculer.Payload) {
sender := payload.Get("sender").String()

transporter.logger.Trace("Received gossip request from " + sender)
transporter.logger.Debug("onGossipRequest() - sender:" + sender)

onlineResponse := map[string]interface{}{}
offlineResponse := map[string]interface{}{}

transporter.registry.ForEachNode(func(node moleculer.Node) bool {
onlineMap := payload.Get("online")
offlineMap := payload.Get("offline")

onlineMap := payload.Get("online")
offlineMap := payload.Get("offline")
transporter.registry.ForEachNode(func(node moleculer.Node) bool {
var seq int64 = 0
var cpuSeq int64 = 0
var cpu int64 = 0
Expand All @@ -145,16 +168,6 @@ func (transporter *TCPTransporter) onGossipRequest(msgBytes *[]byte) {
}
}

if node.IsLocal() {
node.UpdateMetrics()
info := node.ExportAsMap()
seq = node.GetSequence()
cpu = node.GetCpu()
cpuSeq = node.GetCpuSequence()
onlineResponse[node.GetID()] = []interface{}{info, node.GetCpuSequence(), node.GetCpu()}
// transporter.logger.Debug("Node is local - send back the node info and cpu, cpuSed to "+node.GetID(), " seq: ", seq, " cpuSeq: ", cpuSeq, " cpu: ", cpu, " info: ", util.PrettyPrintMap(info))
}

if seq != 0 && seq < node.GetSequence() {
transporter.logger.Debug("We have newer info or requester doesn't know it")
if node.IsAvailable() {
Expand Down Expand Up @@ -215,7 +228,7 @@ func (transporter *TCPTransporter) onGossipRequest(msgBytes *[]byte) {
transporter.logger.Debug("CPU info updated for " + node.GetID())
} else if cpuSeq < node.GetCpuSequence() {
// We have newer info, send back
onlineResponse[node.GetID()] = []interface{}{node.GetCpuSequence(), node.GetCpu()}
onlineResponse[node.GetID()] = []interface{}{node.ExportAsMap(), node.GetCpuSequence(), node.GetCpu()}
transporter.logger.Debug("CPU info sent back to " + node.GetID())
}
} else {
Expand All @@ -227,26 +240,34 @@ func (transporter *TCPTransporter) onGossipRequest(msgBytes *[]byte) {
return true
})

localNode := transporter.registry.GetLocalNode()
localNode.UpdateMetrics()
info := localNode.ExportAsMap()
onlineResponse[localNode.GetID()] = []interface{}{info, localNode.GetCpuSequence(), localNode.GetCpu()}

if len(onlineResponse) > 0 || len(offlineResponse) > 0 {
sender := payload.Get("sender").String()
responsePayload := payloadPkg.
Empty().
Add("online", onlineResponse).
Add("offline", offlineResponse).
Add("sender", transporter.registry.GetLocalNode().GetID())
if len(onlineResponse) > 0 {
responsePayload.Add("online", onlineResponse)
}
if len(offlineResponse) > 0 {
responsePayload.Add("offline", offlineResponse)
}
transporter.logger.Trace("Gossip response sent to "+sender, " payload:", util.PrettyPrintMap(responsePayload.RawMap()))
transporter.Publish(msgTypeToCommand(PACKET_GOSSIP_RES), sender, responsePayload)
transporter.logger.Trace("Gossip response sent to " + sender)
} else {
transporter.logger.Trace("No response sent to " + sender)
}

}

func (transporter *TCPTransporter) onGossipResponse(msgBytes *[]byte) {
payload := transporter.serializer.BytesToPayload(msgBytes)
func (transporter *TCPTransporter) onGossipResponse(payload moleculer.Payload) {
sender := payload.Get("sender").String()

transporter.logger.Trace("Received gossip response from " + sender)
transporter.logger.Trace("Received gossip response from "+sender, " payload:", util.PrettyPrintMap(payload.RawMap()))

online := payload.Get("online")
offline := payload.Get("offline")
Expand All @@ -255,6 +276,7 @@ func (transporter *TCPTransporter) onGossipResponse(msgBytes *[]byte) {
transporter.logger.Trace("Received online info from nodeID: " + sender)
online.ForEach(func(key interface{}, value moleculer.Payload) bool {
nodeID, ok := key.(string)
transporter.logger.Debug("Received online info from nodeID: " + nodeID)
if !ok {
transporter.logger.Error("Error parsing online nodeID")
return true
Expand Down
16 changes: 12 additions & 4 deletions transit/tcp/tcp-reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ const (

type OnMessageFunc func(fromAddrss string, msgType int, msgBytes *[]byte)

type OnConnectionFunc func(address, host string, port int)

type TcpReader struct {
port int
listener net.Listener
Expand All @@ -27,16 +29,18 @@ type TcpReader struct {
lock sync.Mutex
state State
maxPacketSize int
onConnection OnConnectionFunc
onMessage OnMessageFunc
disconnectNodeByAddress func(address string)
}

func NewTcpReader(port int, onMessage OnMessageFunc, disconnectNodeByAddress func(address string), logger *log.Entry) *TcpReader {
func NewTcpReader(port int, onMessage OnMessageFunc, onConnection OnConnectionFunc, disconnectNodeByAddress func(address string), logger *log.Entry) *TcpReader {
return &TcpReader{
port: port,
sockets: make(map[net.Conn]bool),
logger: logger,
onMessage: onMessage,
onConnection: onConnection,
disconnectNodeByAddress: disconnectNodeByAddress,
}
}
Expand Down Expand Up @@ -85,12 +89,16 @@ func (r *TcpReader) Listen() (int, error) {

func (r *TcpReader) handleConnection(conn net.Conn) {
address := conn.RemoteAddr().String()
host, _, err := net.SplitHostPort(address)
host, port, err := net.SplitHostPort(address)
if err != nil {
r.logger.Error("Failed to split host and port - address:", address)
}

r.logger.Debugf("New TCP client connected from '%s'\n", address)
postInt, err := strconv.Atoi(port)
if err != nil {
r.logger.Error("Failed to convert port to integer - port:", port)
}
r.onConnection(address, host, postInt)
for err == nil {
msgType, msgBytes, e := r.readMessage(conn)
err = e
Expand All @@ -104,7 +112,7 @@ func (r *TcpReader) handleConnection(conn net.Conn) {
}
break
}
r.logger.Trace("handleConnection() message read from socket - msgType: ", msgType, "message:", string(msgBytes))

r.onMessage(host, msgType, &msgBytes)
}
r.closeSocket(conn)
Expand Down
Loading

0 comments on commit 11ea3ee

Please sign in to comment.