diff --git a/broker/broker.go b/broker/broker.go index c6f0b0b6..26d53aaf 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -5,6 +5,7 @@ import ( "strings" "time" + "github.com/hashicorp/go-uuid" bus "github.com/moleculer-go/goemitter" "github.com/moleculer-go/moleculer" "github.com/moleculer-go/moleculer/cache" @@ -115,6 +116,8 @@ type ServiceBroker struct { id string + instanceID string + localNode moleculer.Node } @@ -566,6 +569,13 @@ func (broker *ServiceBroker) init() { broker.config = broker.middlewares.CallHandlers("Config", broker.config).(moleculer.Config) + instanceID, err := uuid.GenerateUUID() + if err != nil { + broker.logger.Error("Could not create an instance id - error ", err) + instanceID = "error creating instance id" + } + broker.instanceID = instanceID + broker.delegates = broker.createDelegates() broker.registry = registry.CreateRegistry(broker.id, broker.delegates) broker.localNode = broker.registry.LocalNode() @@ -580,6 +590,9 @@ func (broker *ServiceBroker) createDelegates() *moleculer.BrokerDelegates { Bus: broker.LocalBus, IsStarted: broker.IsStarted, Config: broker.config, + InstanceID: func() string { + return broker.instanceID + }, ActionDelegate: func(context moleculer.BrokerContext, opts ...moleculer.Options) chan moleculer.Payload { return broker.registry.LoadBalanceCall(context, opts...) }, diff --git a/context/contextFactory.go b/context/contextFactory.go index 6ee06243..3a4572ad 100644 --- a/context/contextFactory.go +++ b/context/contextFactory.go @@ -26,6 +26,7 @@ type Context struct { meta moleculer.Payload timeout int level int + caller string } func BrokerContext(broker *moleculer.BrokerDelegates) moleculer.BrokerContext { @@ -46,7 +47,7 @@ func (context *Context) ChildEventContext(eventName string, params moleculer.Pay parentContext := context meta := parentContext.meta if context.broker.Config.Metrics { - meta = meta.Add("metrics", true) + meta = meta.Add("tracing", true) } id := util.RandomString(12) var requestID string @@ -55,6 +56,10 @@ func (context *Context) ChildEventContext(eventName string, params moleculer.Pay } else { requestID = id } + caller := parentContext.actionName + if parentContext.eventName != "" { + caller = parentContext.eventName + } eventContext := Context{ id: id, requestID: requestID, @@ -66,6 +71,7 @@ func (context *Context) ChildEventContext(eventName string, params moleculer.Pay level: parentContext.level + 1, meta: meta, parentID: parentContext.id, + caller: caller, } return &eventContext } @@ -80,7 +86,7 @@ func (context *Context) ChildActionContext(actionName string, params moleculer.P parentContext := context meta := parentContext.meta if context.broker.Config.Metrics { - meta = meta.Add("metrics", true) + meta = meta.Add("tracing", true) } if len(opts) > 0 && opts[0].Meta != nil && opts[0].Meta.Len() > 0 { meta = meta.AddMany(opts[0].Meta.RawMap()) @@ -92,6 +98,10 @@ func (context *Context) ChildActionContext(actionName string, params moleculer.P } else { requestID = id } + caller := parentContext.actionName + if parentContext.eventName != "" { + caller = parentContext.eventName + } actionContext := Context{ id: id, requestID: requestID, @@ -101,15 +111,11 @@ func (context *Context) ChildActionContext(actionName string, params moleculer.P level: parentContext.level + 1, meta: meta, parentID: parentContext.id, + caller: caller, } return &actionContext } -// Max calling level check to avoid calling loops -func checkMaxCalls(context *Context) { - -} - // ActionContext create an action context for remote call. func ActionContext(broker *moleculer.BrokerDelegates, values map[string]interface{}) moleculer.BrokerContext { var level int @@ -130,6 +136,10 @@ func ActionContext(broker *moleculer.BrokerDelegates, values map[string]interfac parentID = s } } + // params := payload.Empty() + // if values["params"] != nil { + // params = payload.New(values["params"]) + // } params := payload.New(values["params"]) if values["timeout"] != nil { @@ -210,20 +220,22 @@ func (context *Context) RequestID() string { func (context *Context) AsMap() map[string]interface{} { mapResult := make(map[string]interface{}) - var metrics bool - if context.meta.Get("metrics").Exists() { - metrics = context.meta.Get("metrics").Bool() + var tracing bool + if context.meta.Get("tracing").Exists() { + tracing = context.meta.Get("tracing").Bool() } mapResult["id"] = context.id mapResult["requestID"] = context.requestID mapResult["level"] = context.level + mapResult["meta"] = context.meta.RawMap() + mapResult["caller"] = context.caller + mapResult["tracing"] = tracing + mapResult["parentID"] = context.parentID + if context.actionName != "" { mapResult["action"] = context.actionName - mapResult["metrics"] = metrics - mapResult["parentID"] = context.parentID - mapResult["meta"] = context.meta.RawMap() mapResult["timeout"] = context.timeout mapResult["params"] = context.params.Value() } @@ -232,11 +244,12 @@ func (context *Context) AsMap() map[string]interface{} { mapResult["groups"] = context.groups mapResult["broadcast"] = context.broadcast mapResult["data"] = context.params.Value() - mapResult["meta"] = context.meta.RawMap() + mapResult["level"] = context.level } - //TODO : check how to support streaming params in go + //streaming not supported yet mapResult["stream"] = false + //mapResult["seq"] = 0 // for stream payloads return mapResult } @@ -323,3 +336,7 @@ func (context *Context) Logger() *log.Entry { } return context.broker.Logger("context", "") } + +func (context *Context) Caller() string { + return context.caller +} diff --git a/context/context_test.go b/context/context_test.go index e209dd28..69850da8 100644 --- a/context/context_test.go +++ b/context/context_test.go @@ -33,7 +33,7 @@ var _ = g.Describe("Context", func() { "meta": map[string]interface{}{}, }) Expect(actionContext).ShouldNot(BeNil()) - Expect(len(actionContext.AsMap())).Should(Equal(10)) + Expect(len(actionContext.AsMap())).Should(Equal(11)) Expect(actionContext.ActionName()).Should(Equal("action")) Expect(actionContext.Payload()).Should(Equal(payload.Empty())) Expect(actionContext.ID()).Should(Equal("id")) @@ -47,7 +47,26 @@ var _ = g.Describe("Context", func() { "parentID": "parentID", "params": map[string]interface{}{}, }) - }).Should(Panic()) + }).Should(Panic()) //no action + }) + + g.It("Should create an action context with no params", func() { + delegates := test.DelegatesWithIdAndConfig("x", moleculer.Config{}) + actionContext := ActionContext(delegates, map[string]interface{}{ + "sender": "test", + "id": "id", + "action": "action", + "level": 2, + "timeout": 20, + "parentID": "parentID", + "meta": map[string]interface{}{}, + }) + Expect(actionContext).ShouldNot(BeNil()) + Expect(len(actionContext.AsMap())).Should(Equal(11)) + Expect(actionContext.ActionName()).Should(Equal("action")) + Expect(actionContext.Payload()).Should(Equal(payload.New(nil))) + Expect(actionContext.ID()).Should(Equal("id")) + Expect(actionContext.Logger()).ShouldNot(BeNil()) }) g.It("Should call SetTargetNodeID", func() { @@ -81,7 +100,7 @@ var _ = g.Describe("Context", func() { }) Expect(eventContext).ShouldNot(BeNil()) Expect(eventContext.IsBroadcast()).Should(BeTrue()) - Expect(len(eventContext.AsMap())).Should(Equal(9)) + Expect(len(eventContext.AsMap())).Should(Equal(12)) Expect(eventContext.EventName()).Should(Equal("event")) Expect(eventContext.Groups()).Should(Equal([]string{"a", "b"})) Expect(eventContext.Payload()).Should(Equal(payload.Empty())) @@ -96,7 +115,7 @@ var _ = g.Describe("Context", func() { }).Should(Panic()) }) - g.It("Should create a child context with metrics on", func() { + g.It("Should create a child context with tracing on", func() { config := moleculer.Config{ Metrics: true, @@ -104,12 +123,12 @@ var _ = g.Describe("Context", func() { brokerContext := BrokerContext(test.DelegatesWithIdAndConfig("nodex", config)) actionContext := brokerContext.ChildActionContext("actionx", nil) Expect(actionContext.Meta()).ShouldNot(BeNil()) - Expect(actionContext.Meta().Get("metrics").Bool()).Should(BeTrue()) + Expect(actionContext.Meta().Get("tracing").Bool()).Should(BeTrue()) eventContext := brokerContext.ChildEventContext("eventx", nil, nil, false) Expect(eventContext.Meta()).ShouldNot(BeNil()) Expect(eventContext.RequestID()).ShouldNot(Equal("")) - Expect(actionContext.Meta().Get("metrics").Bool()).Should(BeTrue()) + Expect(actionContext.Meta().Get("tracing").Bool()).Should(BeTrue()) config = moleculer.Config{ Metrics: false, @@ -117,11 +136,11 @@ var _ = g.Describe("Context", func() { brokerContext = BrokerContext(test.DelegatesWithIdAndConfig("nodex", config)) actionContext = brokerContext.ChildActionContext("actionx", nil) Expect(actionContext.Meta()).ShouldNot(BeNil()) - Expect(actionContext.Meta().Get("metrics").Exists()).Should(BeFalse()) + Expect(actionContext.Meta().Get("tracing").Exists()).Should(BeFalse()) eventContext = brokerContext.ChildEventContext("eventx", nil, nil, false) Expect(eventContext.Meta()).ShouldNot(BeNil()) - Expect(actionContext.Meta().Get("metrics").Exists()).Should(BeFalse()) + Expect(actionContext.Meta().Get("tracing").Exists()).Should(BeFalse()) }) @@ -188,4 +207,18 @@ var _ = g.Describe("Context", func() { Expect(called).Should(BeTrue()) }) + g.It("Should create a child context with proper caller setting", func() { + + config := moleculer.Config{ + Metrics: true, + } + brokerContext := BrokerContext(test.DelegatesWithIdAndConfig("nodex", config)) + + actionAContext := brokerContext.ChildActionContext("servicex.action_a", nil) + + actionBContext := actionAContext.ChildActionContext("servicex.action_b", nil) + + Expect(actionBContext.Caller()).Should(Equal("servicex.action_a")) + }) + }) diff --git a/go.mod b/go.mod index e779cad5..dfbf211e 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect github.com/gogo/protobuf v1.2.1 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect + github.com/hashicorp/go-uuid v1.0.2 // indirect github.com/hashicorp/raft v1.0.1 // indirect github.com/lib/pq v1.1.1 // indirect github.com/moleculer-go/cupaloy/v2 v2.5.2 diff --git a/go.sum b/go.sum index 412ef60d..71146b04 100644 --- a/go.sum +++ b/go.sum @@ -68,8 +68,9 @@ github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI= github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= -github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= diff --git a/metrics/metrics.go b/metrics/metrics.go index 59c6c18c..ad8983db 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -64,7 +64,7 @@ func metricsPayload(brokerContext moleculer.BrokerContext) map[string]interface{ func createShouldMetric(Config moleculer.Config) func(context moleculer.BrokerContext) bool { var callsCount float32 = 0 return func(context moleculer.BrokerContext) bool { - if context.Meta().Get("metrics").Bool() { + if context.Meta().Get("tracing").Bool() { callsCount++ if callsCount*Config.MetricsRate >= 1.0 { diff --git a/moleculer.go b/moleculer.go index d9b98e8e..68d3fcfb 100644 --- a/moleculer.go +++ b/moleculer.go @@ -171,7 +171,7 @@ var DefaultConfig = Config{ RetryPolicy: RetryPolicy{ Enabled: false, }, - RequestTimeout: 1 * time.Minute, + RequestTimeout: 3 * time.Second, MCallTimeout: 5 * time.Second, WaitForNeighboursInterval: 200 * time.Millisecond, } @@ -203,6 +203,7 @@ type LoggerFunc func(name string, value string) *log.Entry type BusFunc func() *bus.Emitter type isStartedFunc func() bool type LocalNodeFunc func() Node +type InstanceIDFunc func() string type ActionDelegateFunc func(context BrokerContext, opts ...Options) chan Payload type EmitEventFunc func(context BrokerContext) type ServiceForActionFunc func(string) []*ServiceSchema @@ -261,6 +262,7 @@ type BrokerContext interface { Payload() Payload Groups() []string IsBroadcast() bool + Caller() string //export context info in a map[string] AsMap() map[string]interface{} @@ -280,6 +282,7 @@ type BrokerContext interface { //Needs Refactoring..2 broker interfaces.. one for regiwstry.. and for for all others. type BrokerDelegates struct { + InstanceID InstanceIDFunc LocalNode LocalNodeFunc Logger LoggerFunc Bus BusFunc diff --git a/registry/.snapshots/registry_test-glob--func3-1-1-1-no-services-$node.listcpuBroker b/registry/.snapshots/registry_test-glob--func3-1-1-1-no-services-$node.listcpuBroker index 2ce4f223..8104e041 100644 --- a/registry/.snapshots/registry_test-glob--func3-1-1-1-no-services-$node.listcpuBroker +++ b/registry/.snapshots/registry_test-glob--func3-1-1-1-no-services-$node.listcpuBroker @@ -1,5 +1,5 @@ (map[string]map[string]interface {}) (len=3) { - (string) (len=13) "nodeCpuBroker": (map[string]interface {}) (len=8) { + (string) (len=13) "nodeCpuBroker": (map[string]interface {}) (len=9) { (string) (len=2) "id": (string) (len=14) "node_cpuBroker", (string) (len=3) "cpu": (int64) 0, (string) (len=3) "seq": (string) (len=7) "removed", @@ -13,9 +13,11 @@ (string) (len=13) "100.100.0.100" }, (string) (len=8) "hostname": (string) (len=7) "removed", + (string) (len=8) "metadata": (map[string]interface {}) { + }, (string) (len=9) "available": (bool) true }, - (string) (len=17) "nodePrinterBroker": (map[string]interface {}) (len=8) { + (string) (len=17) "nodePrinterBroker": (map[string]interface {}) (len=9) { (string) (len=2) "id": (string) (len=18) "node_printerBroker", (string) (len=3) "cpu": (int64) 0, (string) (len=3) "seq": (string) (len=7) "removed", @@ -29,9 +31,11 @@ (string) (len=13) "100.100.0.100" }, (string) (len=8) "hostname": (string) (len=7) "removed", + (string) (len=8) "metadata": (map[string]interface {}) { + }, (string) (len=9) "available": (bool) true }, - (string) (len=17) "nodeScannerBroker": (map[string]interface {}) (len=8) { + (string) (len=17) "nodeScannerBroker": (map[string]interface {}) (len=9) { (string) (len=2) "id": (string) (len=18) "node_scannerBroker", (string) (len=3) "cpu": (int64) 0, (string) (len=3) "seq": (string) (len=7) "removed", @@ -45,6 +49,8 @@ (string) (len=13) "100.100.0.100" }, (string) (len=8) "hostname": (string) (len=7) "removed", + (string) (len=8) "metadata": (map[string]interface {}) { + }, (string) (len=9) "available": (bool) true } } diff --git a/registry/.snapshots/registry_test-glob--func3-1-1-1-no-services-$node.listprinterBroker b/registry/.snapshots/registry_test-glob--func3-1-1-1-no-services-$node.listprinterBroker index b1932c15..2d00ad71 100644 --- a/registry/.snapshots/registry_test-glob--func3-1-1-1-no-services-$node.listprinterBroker +++ b/registry/.snapshots/registry_test-glob--func3-1-1-1-no-services-$node.listprinterBroker @@ -1,6 +1,6 @@ (map[string]map[string]interface {}) (len=3) { (string) (len=13) "nodeCpuBroker": (map[string]interface {}) , - (string) (len=17) "nodePrinterBroker": (map[string]interface {}) (len=8) { + (string) (len=17) "nodePrinterBroker": (map[string]interface {}) (len=9) { (string) (len=2) "id": (string) (len=18) "node_printerBroker", (string) (len=3) "cpu": (int64) 0, (string) (len=3) "seq": (string) (len=7) "removed", @@ -14,6 +14,8 @@ (string) (len=13) "100.100.0.100" }, (string) (len=8) "hostname": (string) (len=7) "removed", + (string) (len=8) "metadata": (map[string]interface {}) { + }, (string) (len=9) "available": (bool) true }, (string) (len=17) "nodeScannerBroker": (map[string]interface {}) diff --git a/registry/.snapshots/registry_test-glob--func3-1-1-1-no-services-$node.listscannerBroker b/registry/.snapshots/registry_test-glob--func3-1-1-1-no-services-$node.listscannerBroker index 063780ca..72837a71 100644 --- a/registry/.snapshots/registry_test-glob--func3-1-1-1-no-services-$node.listscannerBroker +++ b/registry/.snapshots/registry_test-glob--func3-1-1-1-no-services-$node.listscannerBroker @@ -1,6 +1,6 @@ (map[string]map[string]interface {}) (len=3) { (string) (len=13) "nodeCpuBroker": (map[string]interface {}) , - (string) (len=17) "nodePrinterBroker": (map[string]interface {}) (len=8) { + (string) (len=17) "nodePrinterBroker": (map[string]interface {}) (len=9) { (string) (len=2) "id": (string) (len=18) "node_printerBroker", (string) (len=3) "cpu": (int64) 0, (string) (len=3) "seq": (string) (len=7) "removed", @@ -14,9 +14,11 @@ (string) (len=13) "100.100.0.100" }, (string) (len=8) "hostname": (string) (len=7) "removed", + (string) (len=8) "metadata": (map[string]interface {}) { + }, (string) (len=9) "available": (bool) true }, - (string) (len=17) "nodeScannerBroker": (map[string]interface {}) (len=8) { + (string) (len=17) "nodeScannerBroker": (map[string]interface {}) (len=9) { (string) (len=2) "id": (string) (len=18) "node_scannerBroker", (string) (len=3) "cpu": (int64) 0, (string) (len=3) "seq": (string) (len=7) "removed", @@ -30,6 +32,8 @@ (string) (len=13) "100.100.0.100" }, (string) (len=8) "hostname": (string) (len=7) "removed", + (string) (len=8) "metadata": (map[string]interface {}) { + }, (string) (len=9) "available": (bool) true } } diff --git a/registry/.snapshots/registry_test-glob--func3-1-1-1-with-services-$node.listprinterBroker b/registry/.snapshots/registry_test-glob--func3-1-1-1-with-services-$node.listprinterBroker index e04ff316..edb3502e 100644 --- a/registry/.snapshots/registry_test-glob--func3-1-1-1-with-services-$node.listprinterBroker +++ b/registry/.snapshots/registry_test-glob--func3-1-1-1-with-services-$node.listprinterBroker @@ -1,6 +1,6 @@ (map[string]map[string]interface {}) (len=3) { (string) (len=13) "nodeCpuBroker": (map[string]interface {}) , - (string) (len=17) "nodePrinterBroker": (map[string]interface {}) (len=9) { + (string) (len=17) "nodePrinterBroker": (map[string]interface {}) (len=10) { (string) (len=2) "id": (string) (len=18) "node_printerBroker", (string) (len=3) "cpu": (int64) 0, (string) (len=3) "seq": (string) (len=7) "removed", @@ -14,6 +14,8 @@ (string) (len=13) "100.100.0.100" }, (string) (len=8) "hostname": (string) (len=7) "removed", + (string) (len=8) "metadata": (map[string]interface {}) { + }, (string) (len=8) "services": ([]map[string]interface {}) (len=2) { (map[string]interface {}) (len=7) { (string) (len=4) "name": (string) (len=5) "$node", diff --git a/registry/node.go b/registry/node.go index 43b2d84b..8c93b287 100644 --- a/registry/node.go +++ b/registry/node.go @@ -154,6 +154,8 @@ func (node *Node) ExportAsMap() map[string]interface{} { resultMap["cpu"] = node.cpu resultMap["cpuSeq"] = node.cpuSequence resultMap["available"] = node.IsAvailable() + resultMap["metadata"] = make(map[string]interface{}) + return resultMap } diff --git a/registry/registry_test.go b/registry/registry_test.go index 36828acb..0de31e5f 100644 --- a/registry/registry_test.go +++ b/registry/registry_test.go @@ -277,6 +277,7 @@ var _ = Describe("Registry", func() { transport := memory.Create(log.WithField("transport", "memory"), mem) return &transport }, + RequestTimeout: 1 * time.Second, }) stageBroker := broker.New(&moleculer.Config{ @@ -287,6 +288,7 @@ var _ = Describe("Registry", func() { transport := memory.Create(log.WithField("transport", "memory"), mem) return &transport }, + RequestTimeout: 1 * time.Second, }) stage2Broker := broker.New(&moleculer.Config{ @@ -297,6 +299,7 @@ var _ = Describe("Registry", func() { transport := memory.Create(log.WithField("transport", "memory"), mem) return &transport }, + RequestTimeout: 1 * time.Second, }) //alarm service - prints the alarm and return the namespace :) diff --git a/serializer/jsonSerializer.go b/serializer/jsonSerializer.go index c0d3cd61..6b197283 100644 --- a/serializer/jsonSerializer.go +++ b/serializer/jsonSerializer.go @@ -31,8 +31,8 @@ func CreateJSONSerializer(logger *log.Entry) JSONSerializer { return JSONSerializer{logger} } -// mapToContext make sure all value types are compatible with the context fields. -func (serializer JSONSerializer) contextMap(values map[string]interface{}) map[string]interface{} { +// cleanContextMap make sure all value types are compatible with the context fields. +func (serializer JSONSerializer) cleanContextMap(values map[string]interface{}) map[string]interface{} { if values["level"] != nil { values["level"] = int(values["level"].(float64)) } @@ -249,7 +249,7 @@ func (serializer JSONSerializer) MapToPayload(mapValue *map[string]interface{}) } func (serializer JSONSerializer) PayloadToContextMap(message moleculer.Payload) map[string]interface{} { - return serializer.contextMap(message.RawMap()) + return serializer.cleanContextMap(message.RawMap()) } func (jp JSONPayload) Get(path string, defaultValue ...interface{}) moleculer.Payload { diff --git a/transit/nats/nats_test.go b/transit/nats/nats_test.go index cfc6424b..6576e22d 100644 --- a/transit/nats/nats_test.go +++ b/transit/nats/nats_test.go @@ -89,14 +89,16 @@ var _ = Describe("NATS Streaming Transit", func() { p := (<-profileBroker.Call("user.update", longList)) Expect(p.Error()).Should(Succeed()) userBroker.Stop() - Expect((<-profileBroker.Call("user.update", longList)).IsError()).Should(BeTrue()) + + r := <-profileBroker.Call("user.update", longList) + Expect(r.IsError()).Should(BeTrue()) profileBroker.Stop() }) }) Describe("Start / Stop Cycles.", func() { - logLevel := "fatal" + logLevel := "error" numberOfLoops := 5 loopNumber := 0 Measure("Creation of multiple brokers with connect/disconnect cycles running on nats transporter.", func(bench Benchmarker) { diff --git a/transit/pubsub/pubsub.go b/transit/pubsub/pubsub.go index 944b2eda..4867e9b3 100644 --- a/transit/pubsub/pubsub.go +++ b/transit/pubsub/pubsub.go @@ -17,6 +17,7 @@ import ( "github.com/moleculer-go/moleculer/transit" "github.com/moleculer-go/moleculer/transit/memory" "github.com/moleculer-go/moleculer/transit/nats" + "github.com/moleculer-go/moleculer/util" "github.com/moleculer-go/moleculer" "github.com/moleculer-go/moleculer/serializer" @@ -39,6 +40,11 @@ type PubSub struct { brokerStarted bool } +const DATATYPE_UNDEFINED = 0 +const DATATYPE_NULL = 1 +const DATATYPE_JSON = 2 +const DATATYPE_BUFFER = 3 + func (pubsub *PubSub) onServiceAdded(values ...interface{}) { if pubsub.isConnected && pubsub.brokerStarted { localNodeID := pubsub.broker.LocalNode().GetID() @@ -302,6 +308,11 @@ func (pubsub *PubSub) Emit(context moleculer.BrokerContext) { payload := context.AsMap() payload["sender"] = pubsub.broker.LocalNode().GetID() payload["ver"] = version.MoleculerProtocol() + if context.Payload().Exists() { + payload["dataType"] = DATATYPE_JSON + } else { + payload["dataType"] = DATATYPE_NULL + } pubsub.logger.Trace("Emit() targetNodeID: ", targetNodeID, " payload: ", payload) @@ -322,6 +333,11 @@ func (pubsub *PubSub) Request(context moleculer.BrokerContext) chan moleculer.Pa payload := context.AsMap() payload["sender"] = pubsub.broker.LocalNode().GetID() payload["ver"] = version.MoleculerProtocol() + if context.Payload().Exists() { + payload["paramsType"] = DATATYPE_JSON + } else { + payload["paramsType"] = DATATYPE_NULL + } pubsub.logger.Trace("Request() targetNodeID: ", targetNodeID, " payload: ", payload) @@ -445,6 +461,12 @@ func (pubsub *PubSub) sendResponse(context moleculer.BrokerContext, response mol values["id"] = context.ID() values["meta"] = context.Meta() + if response.Exists() { + values["dataType"] = DATATYPE_JSON + } else { + values["dataType"] = DATATYPE_NULL + } + if response.IsError() { var errMap map[string]string actionError, isActionError := response.Value().(ActionError) @@ -483,12 +505,28 @@ type ActionError interface { Stack() string } +func parseParamsType(value moleculer.Payload) string { + if !value.Exists() { + return "1" //default : null + } + return value.String() +} + // requestHandler : handles when a request arrives on this node. // 1: create a moleculer.Context from the message, the moleculer.Context contains the target action // 2: invoke the action // 3: send a response func (pubsub *PubSub) requestHandler() transit.TransportHandler { return func(message moleculer.Payload) { + paramsType := parseParamsType(message.Get("paramsType")) + if paramsType != "1" && paramsType != "2" { + errMsg := "Expecting paramsType == 2 (JSON) or 1 (Null) - received: " + paramsType + pubsub.logger.Error(errMsg) + //currently there is only one serializer implementation. + //once more serializers are added, pubsub.serializer must change and be dinamic based on paramsType + pubsub.sendResponse(context.ActionContext(pubsub.broker, nil), payload.Error(errMsg)) + return + } values := pubsub.serializer.PayloadToContextMap(message) context := context.ActionContext(pubsub.broker, values) result := <-pubsub.broker.ActionDelegate(context) @@ -526,12 +564,23 @@ func (pubsub *PubSub) neighbours() int64 { return int64(len(pubsub.knownNeighbours)) } +func configToMap(config moleculer.Config) map[string]string { + m := make(map[string]string) + m["logLevel"] = config.LogLevel + m["transporter"] = config.Transporter + m["namespace"] = config.Namespace + m["requestTimeout"] = config.RequestTimeout.String() + return m +} + // broadcastNodeInfo send the local node info to the target node, if empty to all nodes. func (pubsub *PubSub) broadcastNodeInfo(targetNodeID string) { payload := pubsub.broker.LocalNode().ExportAsMap() payload["sender"] = payload["id"] payload["neighbours"] = pubsub.neighbours() payload["ver"] = version.MoleculerProtocol() + payload["config"] = configToMap(pubsub.broker.Config) + payload["instanceID"] = pubsub.broker.InstanceID() message, _ := pubsub.serializer.MapToPayload(&payload) pubsub.transport.Publish("INFO", targetNodeID, message) @@ -563,6 +612,7 @@ func (pubsub *PubSub) SendPing() { ping["sender"] = sender ping["ver"] = version.MoleculerProtocol() ping["time"] = time.Now().Unix() + ping["id"] = util.RandomString(12) pingMessage, _ := pubsub.serializer.MapToPayload(&ping) pubsub.transport.Publish("PING", sender, pingMessage) @@ -576,6 +626,7 @@ func (pubsub *PubSub) pingHandler() transit.TransportHandler { pong["ver"] = version.MoleculerProtocol() pong["time"] = message.Get("time").Int() pong["arrived"] = time.Now().Unix() + pong["id"] = util.RandomString(12) pongMessage, _ := pubsub.serializer.MapToPayload(&pong) pubsub.transport.Publish("PONG", sender, pongMessage) @@ -594,6 +645,7 @@ func (pubsub *PubSub) pongHandler() transit.TransportHandler { mapValue["nodeID"] = message.Get("sender").String() mapValue["elapsedTime"] = elapsed mapValue["timeDiff"] = timeDiff + mapValue["id"] = message.Get("id").String() pubsub.broker.Bus().EmitAsync("$node.pong", []interface{}{mapValue}) } diff --git a/transit/pubsub/pubsub_test.go b/transit/pubsub/pubsub_test.go index b8427ba7..527983d9 100644 --- a/transit/pubsub/pubsub_test.go +++ b/transit/pubsub/pubsub_test.go @@ -39,6 +39,9 @@ var _ = Describe("PubSub Internals", func() { LocalNode: func() moleculer.Node { return &localNode }, + InstanceID: func() string { + return "instance-id" + }, }, transport: mockT, brokerStarted: true, @@ -77,6 +80,9 @@ var _ = Describe("PubSub Internals", func() { LocalNode: func() moleculer.Node { return &localNode }, + InstanceID: func() string { + return "instance-id" + }, }, transport: mockT, } diff --git a/version/version.go b/version/version.go index 5fab0a2c..49915430 100644 --- a/version/version.go +++ b/version/version.go @@ -6,7 +6,7 @@ func Moleculer() string { } func MoleculerProtocol() string { - return "3" + return "4" } func Go() string {