From 11ea3ee61858591bc93d327102b22411aab5e138 Mon Sep 17 00:00:00 2001 From: Rafael Almeida Date: Mon, 15 Apr 2024 22:52:39 +1200 Subject: [PATCH] fix gossip protocol and discovery flow --- moleculer.go | 1 + registry/node.go | 10 ++ registry/registry.go | 1 + service/service.go | 15 ++- transit/tcp/gossip.go | 170 +++++++++++++++++++-------------- transit/tcp/tcp-reader.go | 16 +++- transit/tcp/tcp-transporter.go | 59 ++++++------ 7 files changed, 158 insertions(+), 114 deletions(-) diff --git a/moleculer.go b/moleculer.go index 85c99e2b..835be23c 100644 --- a/moleculer.go +++ b/moleculer.go @@ -223,6 +223,7 @@ type Middleware interface { } type Node interface { GetID() string + GetHost() string ExportAsMap() map[string]interface{} IsAvailable() bool GetIpList() []string diff --git a/registry/node.go b/registry/node.go index 1f8df2e0..627bb44b 100644 --- a/registry/node.go +++ b/registry/node.go @@ -240,6 +240,16 @@ func interfaceToString(list []interface{}) []string { return result } +func (node *Node) GetHost() string { + if node.GetUdpAddress() != "" { + return node.GetUdpAddress() + } + if len(node.GetIpList()) > 0 { + return node.GetIpList()[0] + } + return "" +} + // ExportAsMap export the node info as a map // this map is used to publish the node info to other nodes. func (node *Node) ExportAsMap() map[string]interface{} { diff --git a/registry/registry.go b/registry/registry.go index 2d273188..1f0e9247 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -124,6 +124,7 @@ func (registry *ServiceRegistry) AddOfflineNode(nodeID string, hostname, ipAddre "ipList": []string{ipAddress}, }) registry.nodes.Add(node) + // registry.GetLocalNode().IncreaseSequence() return node } diff --git a/service/service.go b/service/service.go index 809570fe..b9fd8dbd 100644 --- a/service/service.go +++ b/service/service.go @@ -360,6 +360,7 @@ func (service *Service) AsMap() map[string]interface{} { serviceInfo["settings"] = service.settings serviceInfo["metadata"] = service.metadata serviceInfo["nodeID"] = service.nodeID + serviceInfo["fullName"] = service.fullname if service.nodeID == "" { panic("no service.nodeID") @@ -458,7 +459,7 @@ func (service *Service) AddEventMap(eventInfo map[string]interface{}) *Event { return &serviceEvent } -//UpdateFromMap update the service metadata and settings from a serviceInfo map +// UpdateFromMap update the service metadata and settings from a serviceInfo map func (service *Service) UpdateFromMap(serviceInfo map[string]interface{}) { service.settings = serviceInfo["settings"].(map[string]interface{}) service.metadata = serviceInfo["metadata"].(map[string]interface{}) @@ -482,9 +483,13 @@ func populateFromMap(service *Service, serviceInfo map[string]interface{}) { } service.version = ParseVersion(serviceInfo["version"]) service.name = serviceInfo["name"].(string) - service.fullname = JoinVersionToName( - service.name, - service.version) + if fullName, ok := serviceInfo["fullName"]; ok { + service.fullname = fullName.(string) + } else { + service.fullname = JoinVersionToName( + service.name, + service.version) + } service.settings = serviceInfo["settings"].(map[string]interface{}) service.metadata = serviceInfo["metadata"].(map[string]interface{}) @@ -822,7 +827,7 @@ func getName(obj interface{}) (string, error) { } // objToSchema create a service schema based on a object. -//checks if +// checks if func objToSchema(obj interface{}) (moleculer.ServiceSchema, error) { schema := moleculer.ServiceSchema{} name, err := getName(obj) diff --git a/transit/tcp/gossip.go b/transit/tcp/gossip.go index 1b17773b..aa5aed67 100644 --- a/transit/tcp/gossip.go +++ b/transit/tcp/gossip.go @@ -13,18 +13,71 @@ func (transporter *TCPTransporter) startGossipTimer() { transporter.gossipTimer = time.NewTicker(time.Second * time.Duration(transporter.options.GossipPeriod)) go func() { for range transporter.gossipTimer.C { - transporter.sendGossipRequest(false) + transporter.sendGossipRequest("") } }() } -func (transporter *TCPTransporter) sendGossipRequest(broadcast bool) { +func (transporter *TCPTransporter) getRandomNode(nodes []moleculer.Node) moleculer.Node { + if len(nodes) == 0 { + return nil + } + nonLocalNodes := []moleculer.Node{} + for _, node := range nodes { + if !node.IsLocal() { + nonLocalNodes = append(nonLocalNodes, node) + } + } + if len(nonLocalNodes) == 0 { + return nil + } + return nonLocalNodes[rand.Intn(len(nonLocalNodes))] +} - transporter.logger.Trace("Sending gossip request") +func (transporter *TCPTransporter) sendGossip(nodeID string, payload moleculer.Payload) { + node := transporter.registry.GetNodeByID(nodeID) + if !node.IsLocal() { + transporter.logger.Trace("Sending gossip request to "+node.GetID(), "payload:", util.PrettyPrintMap(payload.RawMap())) + transporter.Publish(msgTypeToCommand(PACKET_GOSSIP_REQ), node.GetID(), payload) + } +} + +func (transporter *TCPTransporter) sendGossipHello(nodeID string) { + localNode := transporter.registry.GetLocalNode() + payload := payloadPkg.Empty() + payload.Add("sender", localNode.GetID()) + payload.Add("host", localNode.GetHost()) + payload.Add("port", localNode.GetPort()) + + transporter.Publish(msgTypeToCommand(PACKET_GOSSIP_HELLO), nodeID, payload) +} + +func (transporter *TCPTransporter) onGossipHello(fromAddrss string, payload moleculer.Payload) { + sender := payload.Get("sender").String() + port := payload.Get("port").Int() + hostname := payload.Get("host").String() + transporter.logger.Debug("Received gossip hello from sender: ", sender, "ipAddress: ", fromAddrss, " hostname: ", hostname) + + node := transporter.registry.GetNodeByID(sender) + if node == nil { + transporter.logger.Debug("Unknown node. Register as offline node - sender: ", sender) + node = transporter.registry.AddOfflineNode(sender, hostname, fromAddrss, port) + } + if node.GetUdpAddress() == "" { + node.UpdateInfo(map[string]interface{}{ + "udpAddress": fromAddrss, + }) + } + node.Available() + transporter.logger.Trace("will send a gossip response to node: " + sender) + transporter.onGossipRequest(payloadPkg.Empty().Add("sender", sender)) +} + +func (transporter *TCPTransporter) sendGossipRequest(target string) { + transporter.logger.Trace("Sending gossip request") node := transporter.registry.GetLocalNode() node.UpdateMetrics() - onlineResponse := map[string]interface{}{} offlineResponse := map[string]interface{}{} onlineNodes := []moleculer.Node{} @@ -51,77 +104,47 @@ func (transporter *TCPTransporter) sendGossipRequest(broadcast bool) { } if len(onlineResponse) > 0 { - if broadcast { - transporter.broadcastGossipToNodes(payload, onlineNodes) + var targetNode moleculer.Node + if target != "" { + transporter.logger.Debug("target node is specified target:", target) + targetNode = transporter.registry.GetNodeByID(target) + } + if targetNode == nil { + targetNode = transporter.getRandomNode(onlineNodes) + if targetNode != nil { + transporter.logger.Trace("selected a random node to send gossip request - nodeId:", targetNode.GetID()) + } else { + transporter.logger.Trace("could not select a random node - online nodes size:", len(onlineNodes)) + } + } + if targetNode != nil { + transporter.sendGossip(targetNode.GetID(), payload) } else { - transporter.sendGossipToRandomEndpoint(payload, onlineNodes) + transporter.logger.Debug("No target node found for gossip request - target param:", target, " online nodes size:", len(onlineNodes)) } } if len(offlineNodes) > 0 { ratio := float64(len(offlineNodes)) / float64(len(onlineNodes)+1) if ratio >= 1 || rand.Float64() < ratio { - transporter.sendGossipToRandomEndpoint(payload, offlineNodes) + randomNode := transporter.getRandomNode(offlineNodes) + transporter.sendGossip(randomNode.GetID(), payload) } } } -func (transporter *TCPTransporter) broadcastGossipToNodes(payload moleculer.Payload, nodes []moleculer.Node) { - if len(nodes) == 0 { - return - } - for _, node := range nodes { - if !node.IsLocal() { - transporter.logger.Trace("Sending gossip request to "+node.GetID(), "payload:", payload) - transporter.Publish(msgTypeToCommand(PACKET_GOSSIP_REQ), node.GetID(), payload) - } - } -} - -func (transporter *TCPTransporter) sendGossipToRandomEndpoint(payload moleculer.Payload, nodes []moleculer.Node) { - if len(nodes) == 0 { - return - } - node := nodes[rand.Intn(len(nodes))] - if !node.IsLocal() { - transporter.logger.Trace("Sending gossip request to "+node.GetID(), "payload:", payload) - transporter.Publish(msgTypeToCommand(PACKET_GOSSIP_REQ), node.GetID(), payload) - } -} - -func (transporter *TCPTransporter) onGossipHello(fromAddrss string, msgBytes *[]byte) { - payload := transporter.serializer.BytesToPayload(msgBytes) - sender := payload.Get("sender").String() - port := payload.Get("port").Int() - hostname := payload.Get("host").String() - - transporter.logger.Debug("Received gossip hello from sender: ", sender, "ipAddress: ", fromAddrss, " hostname: ", hostname) - - node := transporter.registry.GetNodeByID(sender) - if node == nil { - transporter.logger.Debug("Unknown node. Register as offline node - sender: ", sender) - node = transporter.registry.AddOfflineNode(sender, hostname, fromAddrss, port) - } - if node.GetUdpAddress() == "" { - node.UpdateInfo(map[string]interface{}{ - "udpAddress": fromAddrss, - }) - } -} - -func (transporter *TCPTransporter) onGossipRequest(msgBytes *[]byte) { - payload := transporter.serializer.BytesToPayload(msgBytes) +func (transporter *TCPTransporter) onGossipRequest(payload moleculer.Payload) { sender := payload.Get("sender").String() - transporter.logger.Trace("Received gossip request from " + sender) + transporter.logger.Debug("onGossipRequest() - sender:" + sender) onlineResponse := map[string]interface{}{} offlineResponse := map[string]interface{}{} - transporter.registry.ForEachNode(func(node moleculer.Node) bool { + onlineMap := payload.Get("online") + offlineMap := payload.Get("offline") - onlineMap := payload.Get("online") - offlineMap := payload.Get("offline") + transporter.registry.ForEachNode(func(node moleculer.Node) bool { var seq int64 = 0 var cpuSeq int64 = 0 var cpu int64 = 0 @@ -145,16 +168,6 @@ func (transporter *TCPTransporter) onGossipRequest(msgBytes *[]byte) { } } - if node.IsLocal() { - node.UpdateMetrics() - info := node.ExportAsMap() - seq = node.GetSequence() - cpu = node.GetCpu() - cpuSeq = node.GetCpuSequence() - onlineResponse[node.GetID()] = []interface{}{info, node.GetCpuSequence(), node.GetCpu()} - // transporter.logger.Debug("Node is local - send back the node info and cpu, cpuSed to "+node.GetID(), " seq: ", seq, " cpuSeq: ", cpuSeq, " cpu: ", cpu, " info: ", util.PrettyPrintMap(info)) - } - if seq != 0 && seq < node.GetSequence() { transporter.logger.Debug("We have newer info or requester doesn't know it") if node.IsAvailable() { @@ -215,7 +228,7 @@ func (transporter *TCPTransporter) onGossipRequest(msgBytes *[]byte) { transporter.logger.Debug("CPU info updated for " + node.GetID()) } else if cpuSeq < node.GetCpuSequence() { // We have newer info, send back - onlineResponse[node.GetID()] = []interface{}{node.GetCpuSequence(), node.GetCpu()} + onlineResponse[node.GetID()] = []interface{}{node.ExportAsMap(), node.GetCpuSequence(), node.GetCpu()} transporter.logger.Debug("CPU info sent back to " + node.GetID()) } } else { @@ -227,26 +240,34 @@ func (transporter *TCPTransporter) onGossipRequest(msgBytes *[]byte) { return true }) + localNode := transporter.registry.GetLocalNode() + localNode.UpdateMetrics() + info := localNode.ExportAsMap() + onlineResponse[localNode.GetID()] = []interface{}{info, localNode.GetCpuSequence(), localNode.GetCpu()} + if len(onlineResponse) > 0 || len(offlineResponse) > 0 { sender := payload.Get("sender").String() responsePayload := payloadPkg. Empty(). - Add("online", onlineResponse). - Add("offline", offlineResponse). Add("sender", transporter.registry.GetLocalNode().GetID()) + if len(onlineResponse) > 0 { + responsePayload.Add("online", onlineResponse) + } + if len(offlineResponse) > 0 { + responsePayload.Add("offline", offlineResponse) + } + transporter.logger.Trace("Gossip response sent to "+sender, " payload:", util.PrettyPrintMap(responsePayload.RawMap())) transporter.Publish(msgTypeToCommand(PACKET_GOSSIP_RES), sender, responsePayload) - transporter.logger.Trace("Gossip response sent to " + sender) } else { transporter.logger.Trace("No response sent to " + sender) } } -func (transporter *TCPTransporter) onGossipResponse(msgBytes *[]byte) { - payload := transporter.serializer.BytesToPayload(msgBytes) +func (transporter *TCPTransporter) onGossipResponse(payload moleculer.Payload) { sender := payload.Get("sender").String() - transporter.logger.Trace("Received gossip response from " + sender) + transporter.logger.Trace("Received gossip response from "+sender, " payload:", util.PrettyPrintMap(payload.RawMap())) online := payload.Get("online") offline := payload.Get("offline") @@ -255,6 +276,7 @@ func (transporter *TCPTransporter) onGossipResponse(msgBytes *[]byte) { transporter.logger.Trace("Received online info from nodeID: " + sender) online.ForEach(func(key interface{}, value moleculer.Payload) bool { nodeID, ok := key.(string) + transporter.logger.Debug("Received online info from nodeID: " + nodeID) if !ok { transporter.logger.Error("Error parsing online nodeID") return true diff --git a/transit/tcp/tcp-reader.go b/transit/tcp/tcp-reader.go index 798912e3..fb3a3d83 100644 --- a/transit/tcp/tcp-reader.go +++ b/transit/tcp/tcp-reader.go @@ -19,6 +19,8 @@ const ( type OnMessageFunc func(fromAddrss string, msgType int, msgBytes *[]byte) +type OnConnectionFunc func(address, host string, port int) + type TcpReader struct { port int listener net.Listener @@ -27,16 +29,18 @@ type TcpReader struct { lock sync.Mutex state State maxPacketSize int + onConnection OnConnectionFunc onMessage OnMessageFunc disconnectNodeByAddress func(address string) } -func NewTcpReader(port int, onMessage OnMessageFunc, disconnectNodeByAddress func(address string), logger *log.Entry) *TcpReader { +func NewTcpReader(port int, onMessage OnMessageFunc, onConnection OnConnectionFunc, disconnectNodeByAddress func(address string), logger *log.Entry) *TcpReader { return &TcpReader{ port: port, sockets: make(map[net.Conn]bool), logger: logger, onMessage: onMessage, + onConnection: onConnection, disconnectNodeByAddress: disconnectNodeByAddress, } } @@ -85,12 +89,16 @@ func (r *TcpReader) Listen() (int, error) { func (r *TcpReader) handleConnection(conn net.Conn) { address := conn.RemoteAddr().String() - host, _, err := net.SplitHostPort(address) + host, port, err := net.SplitHostPort(address) if err != nil { r.logger.Error("Failed to split host and port - address:", address) } - r.logger.Debugf("New TCP client connected from '%s'\n", address) + postInt, err := strconv.Atoi(port) + if err != nil { + r.logger.Error("Failed to convert port to integer - port:", port) + } + r.onConnection(address, host, postInt) for err == nil { msgType, msgBytes, e := r.readMessage(conn) err = e @@ -104,7 +112,7 @@ func (r *TcpReader) handleConnection(conn net.Conn) { } break } - r.logger.Trace("handleConnection() message read from socket - msgType: ", msgType, "message:", string(msgBytes)) + r.onMessage(host, msgType, &msgBytes) } r.closeSocket(conn) diff --git a/transit/tcp/tcp-transporter.go b/transit/tcp/tcp-transporter.go index bb771a8b..8a573943 100644 --- a/transit/tcp/tcp-transporter.go +++ b/transit/tcp/tcp-transporter.go @@ -6,8 +6,10 @@ import ( "time" "github.com/moleculer-go/moleculer" + payloadPkg "github.com/moleculer-go/moleculer/payload" "github.com/moleculer-go/moleculer/serializer" "github.com/moleculer-go/moleculer/transit" + "github.com/moleculer-go/moleculer/util" log "github.com/sirupsen/logrus" ) @@ -107,14 +109,22 @@ const ( PACKET_GOSSIP_HELLO = 8 ) +func (transporter *TCPTransporter) onTcpConnection(fromAddrss string, host string, port int) { + node := transporter.registry.GetNodeByAddress(fromAddrss) + if node != nil { + payload := payloadPkg.Empty().Add("sender", node.GetID()) + transporter.onGossipRequest(payload) + } +} + func (transporter *TCPTransporter) onTcpMessage(fromAddrss string, msgType int, msgBytes *[]byte) { switch msgType { case PACKET_GOSSIP_HELLO: - transporter.onGossipHello(fromAddrss, msgBytes) + transporter.onGossipHello(fromAddrss, transporter.serializer.BytesToPayload(msgBytes)) case PACKET_GOSSIP_REQ: - transporter.onGossipRequest(msgBytes) + transporter.onGossipRequest(transporter.serializer.BytesToPayload(msgBytes)) case PACKET_GOSSIP_RES: - transporter.onGossipResponse(msgBytes) + transporter.onGossipResponse(transporter.serializer.BytesToPayload(msgBytes)) default: transporter.incomingMessage(msgType, msgBytes) } @@ -198,7 +208,7 @@ func (transporter *TCPTransporter) disconnectNodeByAddress(address string) { } func (transporter *TCPTransporter) startTcpServer() { - transporter.tcpReader = NewTcpReader(transporter.options.Port, transporter.onTcpMessage, transporter.disconnectNodeByAddress, transporter.logger.WithFields(log.Fields{ + transporter.tcpReader = NewTcpReader(transporter.options.Port, transporter.onTcpMessage, transporter.onTcpConnection, transporter.disconnectNodeByAddress, transporter.logger.WithFields(log.Fields{ "TCPTransporter": "TCPReader", })) transporter.tcpWriter = NewTcpWriter(transporter.options.MaxConnections, transporter.logger.WithFields(log.Fields{ @@ -258,15 +268,17 @@ func addIpToList(ipList []string, address string) []string { // TODO - check full lifecycle - this message creates or updates a node with ip address and port to connect to directly // need to find where the TCP connection step happens.. is not happening here - where is this node info used ? -func (transporter *TCPTransporter) onUdpMessage(nodeID, address string, port int) { +func (transporter *TCPTransporter) onUdpMessage(nodeID, host string, port int) { if nodeID != "" && nodeID != transporter.options.NodeId { - transporter.logger.Debug("UDP discovery received from " + address + " nodeId: " + nodeID + " port: " + string(port)) + transporter.logger.Debug("UDP discovery received from " + host + " nodeId: " + nodeID + " port: " + string(port)) node := transporter.registry.GetNodeByID(nodeID) if node == nil { transporter.logger.Debug("Unknown node. Register as offline node") - node = transporter.registry.AddOfflineNode(nodeID, address, address, port) + node = transporter.registry.AddOfflineNode(nodeID, host, host, port) + transporter.sendGossipHello(nodeID) + } else if !node.IsAvailable() { - ipList := addIpToList(node.GetIpList(), address) + ipList := addIpToList(node.GetIpList(), host) node.UpdateInfo(map[string]interface{}{ // "hostname": address, "port": port, @@ -274,7 +286,7 @@ func (transporter *TCPTransporter) onUdpMessage(nodeID, address string, port int }) } node.UpdateInfo(map[string]interface{}{ - "udpAddress": address, + "udpAddress": host, }) } } @@ -294,36 +306,22 @@ func (transporter *TCPTransporter) Disconnect() chan error { } func (transporter *TCPTransporter) Subscribe(command, nodeID string, handler transit.TransportHandler) { - // if commandToMsgType(command) == -1 { - // transporter.logger.Error("TCPTransporter.Subscribe() Invalid command: " + command) - // return - // } if _, ok := transporter.handlers[command]; !ok { transporter.handlers[command] = make([]transit.TransportHandler, 0) } transporter.handlers[command] = append(transporter.handlers[command], handler) } -func (transporter *TCPTransporter) getNodeAddress(node moleculer.Node) string { - if node.GetUdpAddress() != "" { - return node.GetUdpAddress() - } - if transporter.options.UseHostname && node.GetHostname() != "" { - return node.GetHostname() - } - if len(node.GetIpList()) > 0 { - return node.GetIpList()[0] - } - return "" -} - func (transporter *TCPTransporter) tryToConnect(nodeID string) error { node := transporter.registry.GetNodeByID(nodeID) if node == nil { transporter.logger.Error("TCPTransporter.tryToConnect() Unknown nodeID: " + nodeID) return errors.New("Unknown nodeID: " + nodeID) } - nodeAddress := transporter.getNodeAddress(node) + nodeAddress := node.GetHost() + if transporter.options.UseHostname && node.GetHostname() != "" { + nodeAddress = node.GetHostname() + } if nodeAddress == "" { transporter.logger.Error("TCPTransporter.tryToConnect() No address found for nodeID: " + nodeID) return errors.New("No address found for nodeID: " + nodeID) @@ -338,7 +336,6 @@ func (transporter *TCPTransporter) tryToConnect(nodeID string) error { } func (transporter *TCPTransporter) Publish(command, nodeID string, message moleculer.Payload) { - transporter.logger.Debug("TCPTransporter.Publish() command: " + command + " to nodeID: " + nodeID) if command == "DISCOVER" { if transporter.udpServer != nil { transporter.udpServer.BroadcastDiscoveryMessage() @@ -346,14 +343,14 @@ func (transporter *TCPTransporter) Publish(command, nodeID string, message molec return } if command == "INFO" { - transporter.sendGossipRequest(true) + transporter.sendGossipRequest(nodeID) return } if command == "HEARTBEAT" { - //how does the JS TCP transporter handle HEARTBEAT? - //prob done by the gossip protocol - already has a timer + //handled by the gossip protocol return } + transporter.logger.Trace("TCPTransporter.Publish() command: "+command+" to nodeID: "+nodeID, " message: ", util.PrettyPrintMap(message.RawMap())) msgType := commandToMsgType(command) if msgType == -1 {