From fa770e6794edc88bfb1db9010054f701922eab05 Mon Sep 17 00:00:00 2001 From: Rafael Date: Mon, 13 Apr 2020 23:01:04 +1200 Subject: [PATCH 01/14] feat (payload.MapOver) transform each item of the list --- moleculer.go | 1 + payload/payload.go | 12 ++++++++++++ serializer/jsonSerializer.go | 12 ++++++++++++ 3 files changed, 25 insertions(+) diff --git a/moleculer.go b/moleculer.go index eecbe2d7..96c1a348 100644 --- a/moleculer.go +++ b/moleculer.go @@ -58,6 +58,7 @@ type Payload interface { IsArray() bool IsMap() bool ForEach(iterator func(key interface{}, value Payload) bool) + MapOver(tranform func(in Payload) Payload) Payload } // ActionSchema is used by the validation engine to check if parameters sent to the action are valid. diff --git a/payload/payload.go b/payload/payload.go index 95042641..7641cf74 100644 --- a/payload/payload.go +++ b/payload/payload.go @@ -227,6 +227,18 @@ func (p *RawPayload) Array() []moleculer.Payload { return nil } +func (p *RawPayload) MapOver(transform func(in moleculer.Payload) moleculer.Payload) moleculer.Payload { + if p.IsArray() { + list := []moleculer.Payload{} + for _, value := range p.Array() { + list = append(list, transform(value)) + } + return New(list) + } else { + return Error("payload.MapOver can only deal with array payloads.") + } +} + func (p *RawPayload) ForEach(iterator func(key interface{}, value moleculer.Payload) bool) { if p.IsArray() { list := p.Array() diff --git a/serializer/jsonSerializer.go b/serializer/jsonSerializer.go index fce7d1e1..557184e8 100644 --- a/serializer/jsonSerializer.go +++ b/serializer/jsonSerializer.go @@ -505,6 +505,18 @@ func (payload JSONPayload) ForEach(iterator func(key interface{}, value molecule }) } +func (p JSONPayload) MapOver(transform func(in moleculer.Payload) moleculer.Payload) moleculer.Payload { + if p.IsArray() { + list := []moleculer.Payload{} + for _, value := range p.Array() { + list = append(list, transform(value)) + } + return payload.New(list) + } else { + return payload.Error("payload.MapOver can only deal with array payloads.") + } +} + func (payload JSONPayload) Bool() bool { return payload.result.Bool() } From f41d9323e374466e5e1fb3bd668c70142c2734cf Mon Sep 17 00:00:00 2001 From: Rafael Date: Tue, 14 Apr 2020 12:36:06 +1200 Subject: [PATCH 02/14] feat (namespace) registry, broker and pubsub --- broker/broker.go | 4 ++ registry/registry.go | 10 ++++- registry/registry_test.go | 92 +++++++++++++++++++++++++++++++++++++++ transit/pubsub/pubsub.go | 9 +++- 4 files changed, 112 insertions(+), 3 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index 2f5e7536..547f3fe5 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -78,6 +78,10 @@ func mergeConfigs(baseConfig moleculer.Config, userConfig []*moleculer.Config) m if config.RequestTimeout != 0 { baseConfig.RequestTimeout = config.RequestTimeout } + + if config.Namespace != "" { + baseConfig.Namespace = config.Namespace + } } } return baseConfig diff --git a/registry/registry.go b/registry/registry.go index 493efde5..d10da431 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -39,6 +39,7 @@ type ServiceRegistry struct { offlineCheckFrequency time.Duration offlineTimeout time.Duration nodeReceivedMutex *sync.Mutex + namespace string } // createTransit create a transit instance based on the config. @@ -80,6 +81,7 @@ func CreateRegistry(nodeID string, broker *moleculer.BrokerDelegates) *ServiceRe offlineTimeout: config.OfflineTimeout, stopping: false, nodeReceivedMutex: &sync.Mutex{}, + namespace: config.Namespace, } registry.logger.Debug("Service Registry created for broker: ", nodeID) @@ -256,11 +258,15 @@ func (registry *ServiceRegistry) BroadcastEvent(context moleculer.BrokerContext) func (registry *ServiceRegistry) LoadBalanceCall(context moleculer.BrokerContext, opts ...moleculer.Options) chan moleculer.Payload { actionName := context.ActionName() params := context.Payload() - registry.logger.Trace("LoadBalanceCall() - actionName: ", actionName, " params: ", params, " opts: ", opts) + + registry.logger.Trace("LoadBalanceCall() - actionName: ", actionName, " params: ", params, " namespace: ", registry.namespace, " opts: ", opts) actionEntry := registry.nextAction(actionName, registry.strategy, opts...) if actionEntry == nil { - msg := fmt.Sprint("Registry - endpoint not found for actionName: ", actionName) + msg := "Registry - endpoint not found for actionName: " + actionName + if registry.namespace != "" { + msg = msg + " namespace: " + registry.namespace + } registry.logger.Error(msg) resultChan := make(chan moleculer.Payload, 1) resultChan <- payload.Error(msg) diff --git a/registry/registry_test.go b/registry/registry_test.go index 24a97175..71a6f25a 100644 --- a/registry/registry_test.go +++ b/registry/registry_test.go @@ -265,4 +265,96 @@ var _ = Describe("Registry", func() { close(done) }, 3) }) + + Describe("Namespace", func() { + + It("Services across namespaces cannos see each other", func(done Done) { + + mem := &memory.SharedMemory{} + + devBroker := broker.New(&moleculer.Config{ + DiscoverNodeID: func() string { return "node1_devBroker" }, + LogLevel: logLevel, + Namespace: "dev", + TransporterFactory: func() interface{} { + transport := memory.Create(log.WithField("transport", "memory"), mem) + return &transport + }, + }) + + stageBroker := broker.New(&moleculer.Config{ + DiscoverNodeID: func() string { return "node1_stageBroker" }, + LogLevel: logLevel, + Namespace: "stage", + TransporterFactory: func() interface{} { + transport := memory.Create(log.WithField("transport", "memory"), mem) + return &transport + }, + }) + + //alarm service - prints the alarm and return the namespace :) + alarmService := func(namemspace string) moleculer.ServiceSchema { + return moleculer.ServiceSchema{ + Name: "alarm", + Actions: []moleculer.Action{ + { + Name: "bell", + Handler: func(context moleculer.Context, params moleculer.Payload) interface{} { + context.Logger().Info("alarm.bell ringing !!! namemspace: ", namemspace) + return namemspace + }, + }, + }, + } + } + + devOnlyService := moleculer.ServiceSchema{ + Name: "good", + Actions: []moleculer.Action{ + { + Name: "code", + Handler: func(context moleculer.Context, params moleculer.Payload) interface{} { + context.Logger().Info("nice code :)") + return "🧠" + }, + }, + }, + } + + devBroker.Publish(alarmService("dev")) + devBroker.Publish(devOnlyService) + devBroker.Start() + + stageBroker.Start() + + devAlarm := <-devBroker.Call("alarm.bell", nil) + Expect(devAlarm.IsError()).Should(BeFalse()) + Expect(devAlarm.String()).Should(Equal("dev")) + + code := <-devBroker.Call("good.code", nil) + Expect(code.IsError()).Should(BeFalse()) + Expect(code.String()).Should(Equal("🧠")) + + time.Sleep(time.Millisecond) + + //alarm.bell should not be accessible to the stage broker + stageAlarm := <-stageBroker.Call("alarm.bell", nil) + Expect(stageAlarm.IsError()).Should(BeTrue()) + Expect(stageAlarm.Error().Error()).Should(Equal("Registry - endpoint not found for actionName: alarm.bell namespace: stage")) + + stageBroker.Publish(alarmService("stage")) + stageAlarm = <-stageBroker.Call("alarm.bell", nil) + Expect(stageAlarm.IsError()).Should(BeFalse()) + Expect(stageAlarm.String()).Should(Equal("stage")) + + code = <-stageBroker.Call("good.code", nil) + Expect(code.IsError()).Should(BeTrue()) + Expect(code.Error().Error()).Should(Equal("Registry - endpoint not found for actionName: good.code namespace: stage")) + + devBroker.Stop() + stageBroker.Stop() + + close(done) + }, 2) + }) }) diff --git a/transit/pubsub/pubsub.go b/transit/pubsub/pubsub.go index 70e9165c..f78e4ca4 100644 --- a/transit/pubsub/pubsub.go +++ b/transit/pubsub/pubsub.go @@ -167,12 +167,19 @@ func (pubsub *PubSub) createTransport() transit.Transport { pubsub.logger.Info("Transporter: Memory") transport = pubsub.createMemoryTransporter() } - transport.SetPrefix("MOL") + transport.SetPrefix(resolveNamespace(pubsub.broker.Config.Namespace)) transport.SetNodeID(pubsub.broker.LocalNode().GetID()) transport.SetSerializer(pubsub.serializer) return transport } +func resolveNamespace(namespace string) string { + if namespace != "" { + return "MOL-" + namespace + } + return "MOL" +} + func (pubsub *PubSub) createMemoryTransporter() transit.Transport { pubsub.logger.Debug("createMemoryTransporter() ... ") logger := pubsub.logger.WithField("transport", "memory") From c39ffcbcb060cbc482e8961c7b2658691d5d4455 Mon Sep 17 00:00:00 2001 From: Rafael Date: Tue, 14 Apr 2020 14:04:38 +1200 Subject: [PATCH 03/14] fix (pubsub) clean up stan clientID before connection --- transit/nats/stan.go | 1 - transit/pubsub/pubsub.go | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/transit/nats/stan.go b/transit/nats/stan.go index 9501263d..6fb16565 100644 --- a/transit/nats/stan.go +++ b/transit/nats/stan.go @@ -53,7 +53,6 @@ func (transporter *StanTransporter) Connect() chan error { connection, err := stan.Connect(transporter.clusterID, transporter.clientID, stan.NatsURL(transporter.url)) if err != nil { transporter.logger.Error("STAN Connect() - Error: ", err, " clusterID: ", transporter.clusterID, " clientID: ", transporter.clientID) - //panic("Error trying to connect to stan server. url: " + transporter.url + " clusterID: " + transporter.clusterID + " clientID: " + transporter.clientID + " -> Stan error: " + error.Error()) endChan <- err return } diff --git a/transit/pubsub/pubsub.go b/transit/pubsub/pubsub.go index f78e4ca4..944b2eda 100644 --- a/transit/pubsub/pubsub.go +++ b/transit/pubsub/pubsub.go @@ -202,18 +202,18 @@ func (pubsub *PubSub) createNatsTransporter() transit.Transport { } func (pubsub *PubSub) createStanTransporter() transit.Transport { - //TODO: move this to config and params broker := pubsub.broker + logger := broker.Logger("transport", "stan") + url := "stan://" + os.Getenv("STAN_HOST") + ":4222" clusterID := "test-cluster" - localNodeID := broker.LocalNode().GetID() - logger := broker.Logger("transport", "stan") + clientID := strings.ReplaceAll(localNodeID, ".", "_") options := nats.StanOptions{ url, clusterID, - localNodeID, + clientID, logger, pubsub.serializer, func(message moleculer.Payload) bool { From 692831f30dbf4813865eff5aefa30db081c1136f Mon Sep 17 00:00:00 2001 From: Rafael Date: Wed, 15 Apr 2020 11:50:18 +1200 Subject: [PATCH 04/14] feat (namespace) add positive test scenario --- registry/registry_test.go | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/registry/registry_test.go b/registry/registry_test.go index 71a6f25a..38099db3 100644 --- a/registry/registry_test.go +++ b/registry/registry_test.go @@ -292,6 +292,16 @@ var _ = Describe("Registry", func() { }, }) + stage2Broker := broker.New(&moleculer.Config{ + DiscoverNodeID: func() string { return "node1_stage2Broker" }, + LogLevel: logLevel, + Namespace: "stage", + TransporterFactory: func() interface{} { + transport := memory.Create(log.WithField("transport", "memory"), mem) + return &transport + }, + }) + //alarm service - prints the alarm and return the namespace :) alarmService := func(namemspace string) moleculer.ServiceSchema { return moleculer.ServiceSchema{ @@ -308,13 +318,13 @@ var _ = Describe("Registry", func() { } } + //available in the dev namespace only devOnlyService := moleculer.ServiceSchema{ - Name: "good", + Name: "devOnly", Actions: []moleculer.Action{ { Name: "code", Handler: func(context moleculer.Context, params moleculer.Payload) interface{} { - context.Logger().Info("nice code :)") return "🧠" }, }, @@ -326,12 +336,24 @@ var _ = Describe("Registry", func() { devBroker.Start() stageBroker.Start() + stage2Broker.Publish(moleculer.ServiceSchema{ + Name: "stage2", + Actions: []moleculer.Action{ + { + Name: "where", + Handler: func(context moleculer.Context, params moleculer.Payload) interface{} { + return "🌏" + }, + }, + }, + }) + stage2Broker.Start() devAlarm := <-devBroker.Call("alarm.bell", nil) Expect(devAlarm.IsError()).Should(BeFalse()) Expect(devAlarm.String()).Should(Equal("dev")) - code := <-devBroker.Call("good.code", nil) + code := <-devBroker.Call("devOnly.code", nil) Expect(code.IsError()).Should(BeFalse()) Expect(code.String()).Should(Equal("🧠")) @@ -351,8 +373,14 @@ var _ = Describe("Registry", func() { Expect(code.IsError()).Should(BeTrue()) Expect(code.Error().Error()).Should(Equal("Registry - endpoint not found for actionName: good.code namespace: stage")) + //make sure 2 brokers on the same namespace can talk to each other + msg := <-stageBroker.Call("stage2.where", nil) + Expect(msg.IsError()).Should(BeFalse()) + Expect(msg.String()).Should(Equal("🌏")) + devBroker.Stop() stageBroker.Stop() + stage2Broker.Stop() close(done) }, 2) From eda4a8bc6154638b55402da127ff82d786bd6abd Mon Sep 17 00:00:00 2001 From: Rafael Date: Wed, 15 Apr 2020 14:01:53 +1200 Subject: [PATCH 05/14] feat (payload) error payload --- moleculer.go | 1 + ...test-glob--func1-9-PayloadError() .Error() | 1 + ...ob--func1-9-PayloadError() .ErrorPayload() | 1 + payload/payload.go | 24 ++++++++++++++++++- payload/payload_test.go | 9 +++++++ serializer/jsonSerializer.go | 7 ++++++ 6 files changed, 42 insertions(+), 1 deletion(-) create mode 100644 payload/.snapshots/payload_test-glob--func1-9-PayloadError() .Error() create mode 100644 payload/.snapshots/payload_test-glob--func1-9-PayloadError() .ErrorPayload() diff --git a/moleculer.go b/moleculer.go index 96c1a348..27372586 100644 --- a/moleculer.go +++ b/moleculer.go @@ -31,6 +31,7 @@ type Payload interface { Exists() bool IsError() bool Error() error + ErrorPayload() Payload Value() interface{} ValueArray() []interface{} Int() int diff --git a/payload/.snapshots/payload_test-glob--func1-9-PayloadError() .Error() b/payload/.snapshots/payload_test-glob--func1-9-PayloadError() .Error() new file mode 100644 index 00000000..d8b791c2 --- /dev/null +++ b/payload/.snapshots/payload_test-glob--func1-9-PayloadError() .Error() @@ -0,0 +1 @@ +(payload.payloadError) Custom error message diff --git a/payload/.snapshots/payload_test-glob--func1-9-PayloadError() .ErrorPayload() b/payload/.snapshots/payload_test-glob--func1-9-PayloadError() .ErrorPayload() new file mode 100644 index 00000000..1c150f8c --- /dev/null +++ b/payload/.snapshots/payload_test-glob--func1-9-PayloadError() .ErrorPayload() @@ -0,0 +1 @@ +(*payload.RawPayload)(map[code:12321321 root_Cause:root cause description]) diff --git a/payload/payload.go b/payload/payload.go index 7641cf74..f97be936 100644 --- a/payload/payload.go +++ b/payload/payload.go @@ -24,7 +24,8 @@ func (p *RawPayload) Exists() bool { func (p *RawPayload) IsError() bool { _, isError := p.source.(error) - return isError + _, isPError := p.source.(payloadError) + return isError || isPError } func (p *RawPayload) Error() error { @@ -565,6 +566,27 @@ func Error(msgs ...interface{}) moleculer.Payload { return New(errors.New(fmt.Sprint(msgs...))) } +type payloadError struct { + err string + payload moleculer.Payload +} + +func (e payloadError) Error() string { + return e.err +} + +func PayloadError(msg string, p moleculer.Payload) moleculer.Payload { + return &RawPayload{source: payloadError{msg, p}} +} + +func (p *RawPayload) ErrorPayload() moleculer.Payload { + pError, ok := p.source.(payloadError) + if ok { + return pError.payload + } + return nil +} + func EmptyList() moleculer.Payload { return &RawPayload{source: []interface{}{}} } diff --git a/payload/payload_test.go b/payload/payload_test.go index f15e135e..288f0bf5 100644 --- a/payload/payload_test.go +++ b/payload/payload_test.go @@ -338,4 +338,13 @@ var _ = Describe("Payload", func() { Expect(snap.SnapshotMulti("Only()", p.Only("Winter"))).ShouldNot(HaveOccurred()) }) + + It("PayloadError should create an error with payload", func() { + p := PayloadError("Custom error message", New(map[string]string{ + "root_Cause": "root cause description", + "code": "12321321", + })) + Expect(snap.SnapshotMulti("PayloadError() .Error()", p.Error())).ShouldNot(HaveOccurred()) + Expect(snap.SnapshotMulti("PayloadError() .ErrorPayload()", p.ErrorPayload())).ShouldNot(HaveOccurred()) + }) }) diff --git a/serializer/jsonSerializer.go b/serializer/jsonSerializer.go index 557184e8..cddefd9d 100644 --- a/serializer/jsonSerializer.go +++ b/serializer/jsonSerializer.go @@ -540,6 +540,13 @@ func (payload JSONPayload) Error() error { return nil } +func (p JSONPayload) ErrorPayload() moleculer.Payload { + if p.IsError() { + return p + } + return nil +} + func orderedKeys(m map[string]moleculer.Payload) []string { keys := make([]string, len(m)) i := 0 From 9da63391028aa7f971f2df9f4b1e4a1389cb67d3 Mon Sep 17 00:00:00 2001 From: Rafael Date: Thu, 16 Apr 2020 19:42:06 +1200 Subject: [PATCH 06/14] feat (serializer) StringToMap and MapToString --- serializer/jsonSerializer.go | 28 ++++++++++++++++++++++++++++ serializer/serializer.go | 5 ++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/serializer/jsonSerializer.go b/serializer/jsonSerializer.go index cddefd9d..fd8ed838 100644 --- a/serializer/jsonSerializer.go +++ b/serializer/jsonSerializer.go @@ -2,8 +2,10 @@ package serializer import ( "bytes" + "encoding/json" "errors" "io" + "sort" "strconv" "time" @@ -59,6 +61,32 @@ func (serializer JSONSerializer) ReaderToPayload(r io.Reader) moleculer.Payload return payload } +//MapToString serialize a map into a string +//This implementation uses the standard library json pkg and it needs to be compared with others for performance. +//Performance: it should be experimented with multiple implementations. This is just he initial one. +func (serializer JSONSerializer) MapToString(m interface{}) string { + r, err := json.Marshal(m) + if err != nil { + serializer.logger.Errorln("Error trying to serialize a map. error: ", err) + panic(err) + } + s := string(r) + return s +} + +//StringToMap deserialize a string (json) into map +//Same implementation and performance notes as MapToString +func (serializer JSONSerializer) StringToMap(j string) map[string]interface{} { + m := map[string]interface{}{} + err := json.Unmarshal([]byte(j), &m) + if err != nil { + serializer.logger.Errorln("Error trying to deserialize a map from json: " + j) + serializer.logger.Errorln("error: ", err) + panic(err) + } + return m +} + func (serializer JSONSerializer) PayloadToBytes(payload moleculer.Payload) []byte { return []byte(serializer.PayloadToString(payload)) } diff --git a/serializer/serializer.go b/serializer/serializer.go index 186d707f..ae0c0eca 100644 --- a/serializer/serializer.go +++ b/serializer/serializer.go @@ -1,8 +1,9 @@ package serializer import ( - "github.com/moleculer-go/moleculer" "io" + + "github.com/moleculer-go/moleculer" ) type Serializer interface { @@ -10,6 +11,8 @@ type Serializer interface { BytesToPayload(*[]byte) moleculer.Payload PayloadToBytes(moleculer.Payload) []byte PayloadToString(moleculer.Payload) string + MapToString(interface{}) string + StringToMap(string) map[string]interface{} PayloadToContextMap(moleculer.Payload) map[string]interface{} MapToPayload(*map[string]interface{}) (moleculer.Payload, error) } From aae177ae1a39c6a8a23d5e9d556951dda14aadb2 Mon Sep 17 00:00:00 2001 From: Rafael Date: Thu, 16 Apr 2020 23:19:47 +1200 Subject: [PATCH 07/14] feat (payload) Get() accept paths.like.this[15] --- context/contextFactory.go | 2 +- moleculer.go | 1 + payload/payload.go | 69 +++++++++++++++++++++++++++++++++++- payload/payload_test.go | 28 +++++++++++++++ serializer/jsonSerializer.go | 11 ++++++ 5 files changed, 109 insertions(+), 2 deletions(-) diff --git a/context/contextFactory.go b/context/contextFactory.go index 7e517bfa..6ee06243 100644 --- a/context/contextFactory.go +++ b/context/contextFactory.go @@ -75,7 +75,7 @@ func (context *Context) BrokerDelegates() *moleculer.BrokerDelegates { return context.broker } -// ChildActionContext : create a chiold context for a specific action call. +// ChildActionContext : create a child context for a specific action call. func (context *Context) ChildActionContext(actionName string, params moleculer.Payload, opts ...moleculer.Options) moleculer.BrokerContext { parentContext := context meta := parentContext.meta diff --git a/moleculer.go b/moleculer.go index 27372586..d9b98e8e 100644 --- a/moleculer.go +++ b/moleculer.go @@ -52,6 +52,7 @@ type Payload interface { Time() time.Time TimeArray() []time.Time Array() []Payload + At(index int) Payload Len() int Get(path string, defaultValue ...interface{}) Payload //Only return a payload containing only the field specified diff --git a/payload/payload.go b/payload/payload.go index f97be936..e32a2be1 100644 --- a/payload/payload.go +++ b/payload/payload.go @@ -3,6 +3,7 @@ package payload import ( "errors" "fmt" + "regexp" "sort" "strconv" "strings" @@ -216,6 +217,17 @@ func (p *RawPayload) First() moleculer.Payload { return New(nil) } +//At returns the item at the given index +func (p *RawPayload) At(index int) moleculer.Payload { + if transformer := ArrayTransformer(&p.source); transformer != nil { + l := transformer.InterfaceArray(&p.source) + if index >= 0 && index < len(l) { + return New(l[index]) + } + } + return nil +} + func (p *RawPayload) Array() []moleculer.Payload { if transformer := ArrayTransformer(&p.source); transformer != nil { source := transformer.InterfaceArray(&p.source) @@ -439,7 +451,62 @@ func (p *RawPayload) mapGet(path string) (interface{}, bool) { return nil, false } -func (p *RawPayload) Get(path string, defaultValue ...interface{}) moleculer.Payload { +func isPath(s string) bool { + return strings.Contains(s, ".") +} + +var indexedKey = regexp.MustCompile(`^(\w+)\[(\d+)\]$`) + +//isIndexed checks if key is indexed e.g. stage[0] +func isIndexed(s string) bool { + return indexedKey.MatchString(s) +} + +func splitIndex(s string) (key string, index int) { + parts := indexedKey.FindStringSubmatch(s) + key = parts[1] + index, _ = strconv.Atoi(parts[2]) + return key, index +} + +func (p *RawPayload) Get(s string, defaultValue ...interface{}) moleculer.Payload { + //check if is a path of key + if isPath(s) { + if defaultValue != nil { + return p.getPath(s, defaultValue) + } + return p.getPath(s) + } + if isIndexed(s) { + k, index := splitIndex(s) + var v moleculer.Payload + if defaultValue != nil { + v = p.getKey(k, defaultValue) + } else { + v = p.getKey(k) + } + return v.At(index) + } + if defaultValue != nil { + return p.getKey(s, defaultValue) + } + return p.getKey(s) +} + +//getPath get a value using a path expression e.g. address.country.code +// it also accepts indexed lists like address.options[0].label +func (p *RawPayload) getPath(path string, defaultValue ...interface{}) moleculer.Payload { + parts := strings.Split(path, ".") + k := parts[0] + v := p.Get(k, defaultValue) + for i := 1; i < len(parts); i++ { + k = parts[i] + v = v.Get(k, defaultValue) + } + return v +} + +func (p *RawPayload) getKey(path string, defaultValue ...interface{}) moleculer.Payload { if value, ok := p.mapGet(path); ok { return New(value) } diff --git a/payload/payload_test.go b/payload/payload_test.go index 288f0bf5..c5d765be 100644 --- a/payload/payload_test.go +++ b/payload/payload_test.go @@ -347,4 +347,32 @@ var _ = Describe("Payload", func() { Expect(snap.SnapshotMulti("PayloadError() .Error()", p.Error())).ShouldNot(HaveOccurred()) Expect(snap.SnapshotMulti("PayloadError() .ErrorPayload()", p.ErrorPayload())).ShouldNot(HaveOccurred()) }) + + type M map[string]interface{} + It("should deal field paths name.subname...", func() { + p := New(M{ + "name": "John", + "lastname": "Snow", + "address": M{ + "street": "jonny ave", + "country": M{ + "code": "NZ", + "name": "New Zealand", + }, + "options": []M{ + M{ + "label": "item 1", + }, + M{ + "label": "item 2", + }, + }, + }, + }) + Expect(p.Get("name").String()).Should(Equal("John")) + Expect(p.Get("address.street").String()).Should(Equal("jonny ave")) + Expect(p.Get("address.country.code").String()).Should(Equal("NZ")) + Expect(p.Get("address.options[0].label").String()).Should(Equal("item 1")) + Expect(p.Get("address.options[1].label").String()).Should(Equal("item 2")) + }) }) diff --git a/serializer/jsonSerializer.go b/serializer/jsonSerializer.go index fd8ed838..c0d3cd61 100644 --- a/serializer/jsonSerializer.go +++ b/serializer/jsonSerializer.go @@ -498,6 +498,17 @@ func (payload JSONPayload) TimeArray() []time.Time { return nil } +func (payload JSONPayload) At(index int) moleculer.Payload { + if payload.IsArray() { + source := payload.result.Array() + if index >= 0 && index < len(source) { + item := source[index] + return JSONPayload{item, payload.logger} + } + } + return nil +} + func (payload JSONPayload) Array() []moleculer.Payload { if payload.IsArray() { source := payload.result.Array() From b092546776be0c54a7466ba1161bc3fe1777b343 Mon Sep 17 00:00:00 2001 From: Rafael Date: Fri, 1 May 2020 17:30:46 +1200 Subject: [PATCH 08/14] feat (payload) test case for wrong paths --- payload/payload.go | 13 ++++++++----- payload/payload_test.go | 20 ++++++++++++++++++++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/payload/payload.go b/payload/payload.go index e32a2be1..802221ac 100644 --- a/payload/payload.go +++ b/payload/payload.go @@ -473,7 +473,7 @@ func (p *RawPayload) Get(s string, defaultValue ...interface{}) moleculer.Payloa //check if is a path of key if isPath(s) { if defaultValue != nil { - return p.getPath(s, defaultValue) + return p.getPath(s, defaultValue...) } return p.getPath(s) } @@ -481,14 +481,14 @@ func (p *RawPayload) Get(s string, defaultValue ...interface{}) moleculer.Payloa k, index := splitIndex(s) var v moleculer.Payload if defaultValue != nil { - v = p.getKey(k, defaultValue) + v = p.getKey(k, defaultValue...) } else { v = p.getKey(k) } return v.At(index) } if defaultValue != nil { - return p.getKey(s, defaultValue) + return p.getKey(s, defaultValue...) } return p.getKey(s) } @@ -498,10 +498,13 @@ func (p *RawPayload) Get(s string, defaultValue ...interface{}) moleculer.Payloa func (p *RawPayload) getPath(path string, defaultValue ...interface{}) moleculer.Payload { parts := strings.Split(path, ".") k := parts[0] - v := p.Get(k, defaultValue) + v := p.Get(k, defaultValue...) for i := 1; i < len(parts); i++ { + if v == nil { + return New(nil) + } k = parts[i] - v = v.Get(k, defaultValue) + v = v.Get(k, defaultValue...) } return v } diff --git a/payload/payload_test.go b/payload/payload_test.go index c5d765be..3d0c6f91 100644 --- a/payload/payload_test.go +++ b/payload/payload_test.go @@ -375,4 +375,24 @@ var _ = Describe("Payload", func() { Expect(p.Get("address.options[0].label").String()).Should(Equal("item 1")) Expect(p.Get("address.options[1].label").String()).Should(Equal("item 2")) }) + It("should deal field paths name.subname...", func() { + p := New(M{ + "address": M{ + "street": "jonny ave", + "options": []M{ + M{ + "label": "item 1", + }, + }, + }, + }) + Expect(p.Get("address.street").String()).Should(Equal("jonny ave")) + + Expect(p.Get("wrong.path").Exists()).Should(BeFalse()) + Expect(p.Get("wrong.path").String()).Should(Equal("")) + + Expect(p.Get("address.wrong").Exists()).Should(BeFalse()) + + Expect(p.Get("address.options[10].label").Exists()).Should(BeFalse()) + }) }) From d93eb94fb8590e3d5335137385871cdc2695e483 Mon Sep 17 00:00:00 2001 From: Kirill Lepikhin Date: Wed, 8 Jul 2020 23:16:37 +0300 Subject: [PATCH 09/14] Added concurrency supports to RandomString. --- util/randomString.go | 5 +++++ util/randomString_suite_test.go | 13 ++++++++++++ util/randomString_test.go | 36 +++++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+) create mode 100644 util/randomString_suite_test.go create mode 100644 util/randomString_test.go diff --git a/util/randomString.go b/util/randomString.go index 272a5bcf..5d3e2ebc 100644 --- a/util/randomString.go +++ b/util/randomString.go @@ -2,6 +2,7 @@ package util import ( "math/rand" + "sync" "time" ) @@ -13,8 +14,12 @@ const ( ) var randomSource = rand.NewSource(time.Now().UnixNano()) +var randomSourceMu = sync.Mutex{} func RandomString(size int) string { + randomSourceMu.Lock() + defer randomSourceMu.Unlock() + buffer := make([]byte, size) // A src.Int63() generates 63 random bits, enough for letterIdxMax characters! for index, cache, remain := size-1, randomSource.Int63(), letterIdxMax; index >= 0; { diff --git a/util/randomString_suite_test.go b/util/randomString_suite_test.go new file mode 100644 index 00000000..76cff2f5 --- /dev/null +++ b/util/randomString_suite_test.go @@ -0,0 +1,13 @@ +package util + +import ( + "testing" + + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" +) + +func TestContext(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + ginkgo.RunSpecs(t, "RandomString Suite") +} diff --git a/util/randomString_test.go b/util/randomString_test.go new file mode 100644 index 00000000..f55cbde3 --- /dev/null +++ b/util/randomString_test.go @@ -0,0 +1,36 @@ +package util + +import ( + "sync" + + g "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = g.Describe("Util", func() { + + g.It("ISSUE-88: Broker hang if multiple goroutines make calls", func() { + threadsNum := 1000 + + buffer := make([]string, threadsNum) + + wg := sync.WaitGroup{} + wg.Add(1000) + + for i := 0; i < 1000; i++ { + go func(idx int) { + defer wg.Done() + buffer[idx] = RandomString(12) + }(i) + } + + wg.Wait() + + sMap := map[string]interface{}{} + for i := 0; i < threadsNum; i++ { + sMap[buffer[i]] = nil + } + + Expect(len(sMap)).Should(Equal(threadsNum)) + }) +}) From 4ba61a5e8e932de20b15dce1878b521432eebc9e Mon Sep 17 00:00:00 2001 From: Rafael Date: Sun, 16 May 2021 16:26:33 +1200 Subject: [PATCH 10/14] chore fixed tests and improved stability --- ...--func1-1-1-aquaBroker-KnownEventListeners | 13 + ...oker-glob--func1-1-1-aquaBroker-KnownNodes | 5 + ...er-glob--func1-1-1-soundsBroker-KnownNodes | 3 + ...soundsBroker-Stopped-aquaBroker-KnownNodes | 6 + ...undsBroker-Stopped-visualBroker-KnownNodes | 6 + ...-func1-1-1-stormBroker-KnownEventListeners | 16 + ...ker-glob--func1-1-1-stormBroker-KnownNodes | 6 + ...-stormBroker-stopped-aquaBroker-KnownNodes | 6 + ...tormBroker-stopped-soundsBroker-KnownNodes | 6 + ...tormBroker-stopped-visualBroker-KnownNodes | 6 + ...er-glob--func1-1-1-visualBroker-KnownNodes | 4 + ...ob--func1-1-aquaBroker-KnownEventListeners | 13 + ...broker-glob--func1-1-aquaBroker-KnownNodes | 5 + ...oker-glob--func1-1-soundsBroker-KnownNodes | 3 + ...soundsBroker-Stopped-aquaBroker-KnownNodes | 6 + ...undsBroker-Stopped-visualBroker-KnownNodes | 6 + ...b--func1-1-stormBroker-KnownEventListeners | 16 + ...roker-glob--func1-1-stormBroker-KnownNodes | 6 + ...-stormBroker-stopped-aquaBroker-KnownNodes | 6 + ...tormBroker-stopped-soundsBroker-KnownNodes | 6 + ...tormBroker-stopped-visualBroker-KnownNodes | 6 + ...oker-glob--func1-1-visualBroker-KnownNodes | 4 + .../broker-glob--func1-2-1-bkr1-results | 6 + .../broker-glob--func1-2-1-bkr2-results | 6 + broker/broker_internals_test.go | 541 +++++++++--------- broker/broker_suite_internals_test.go | 13 - go.mod | 7 +- go.sum | 67 +++ registry/registry_test.go | 15 +- service/service_suite_internal_test.go | 12 - 30 files changed, 512 insertions(+), 309 deletions(-) create mode 100644 broker/.snapshots/broker-glob--func1-1-1-aquaBroker-KnownEventListeners create mode 100644 broker/.snapshots/broker-glob--func1-1-1-aquaBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-1-soundsBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-1-soundsBroker-Stopped-aquaBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-1-soundsBroker-Stopped-visualBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-1-stormBroker-KnownEventListeners create mode 100644 broker/.snapshots/broker-glob--func1-1-1-stormBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-1-stormBroker-stopped-aquaBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-1-stormBroker-stopped-soundsBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-1-stormBroker-stopped-visualBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-1-visualBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-aquaBroker-KnownEventListeners create mode 100644 broker/.snapshots/broker-glob--func1-1-aquaBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-soundsBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-soundsBroker-Stopped-aquaBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-soundsBroker-Stopped-visualBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-stormBroker-KnownEventListeners create mode 100644 broker/.snapshots/broker-glob--func1-1-stormBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-stormBroker-stopped-aquaBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-stormBroker-stopped-soundsBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-stormBroker-stopped-visualBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-1-visualBroker-KnownNodes create mode 100644 broker/.snapshots/broker-glob--func1-2-1-bkr1-results create mode 100644 broker/.snapshots/broker-glob--func1-2-1-bkr2-results delete mode 100644 broker/broker_suite_internals_test.go delete mode 100644 service/service_suite_internal_test.go diff --git a/broker/.snapshots/broker-glob--func1-1-1-aquaBroker-KnownEventListeners b/broker/.snapshots/broker-glob--func1-1-1-aquaBroker-KnownEventListeners new file mode 100644 index 00000000..edd5628c --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-1-aquaBroker-KnownEventListeners @@ -0,0 +1,13 @@ +([]string) (len=11) { + (string) (len=24) "AquaBroker.vj.music.tone", + (string) (len=25) "AquaBroker.vj.music.verse", + (string) (len=26) "AquaBroker.vj.music.chorus", + (string) (len=26) "SoundsBroker.dj.music.tone", + (string) (len=26) "VisualBroker.vj.music.tone", + (string) (len=27) "SoundsBroker.dj.music.verse", + (string) (len=27) "VisualBroker.vj.music.verse", + (string) (len=28) "SoundsBroker.dj.music.chorus", + (string) (len=28) "VisualBroker.vj.music.chorus", + (string) (len=30) "SoundsBroker.music.music.verse", + (string) (len=31) "SoundsBroker.music.music.chorus" +} diff --git a/broker/.snapshots/broker-glob--func1-1-1-aquaBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-1-aquaBroker-KnownNodes new file mode 100644 index 00000000..8861e740 --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-1-aquaBroker-KnownNodes @@ -0,0 +1,5 @@ +([]string) (len=3) { + (string) (len=10) "AquaBroker", + (string) (len=12) "SoundsBroker", + (string) (len=12) "VisualBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-1-soundsBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-1-soundsBroker-KnownNodes new file mode 100644 index 00000000..6fce9f48 --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-1-soundsBroker-KnownNodes @@ -0,0 +1,3 @@ +([]string) (len=1) { + (string) (len=12) "SoundsBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-1-soundsBroker-Stopped-aquaBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-1-soundsBroker-Stopped-aquaBroker-KnownNodes new file mode 100644 index 00000000..1ccf36bb --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-1-soundsBroker-Stopped-aquaBroker-KnownNodes @@ -0,0 +1,6 @@ +([]string) (len=4) { + (string) (len=10) "AquaBroker", + (string) (len=11) "StormBroker", + (string) (len=12) "SoundsBroker", + (string) (len=12) "VisualBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-1-soundsBroker-Stopped-visualBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-1-soundsBroker-Stopped-visualBroker-KnownNodes new file mode 100644 index 00000000..1ccf36bb --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-1-soundsBroker-Stopped-visualBroker-KnownNodes @@ -0,0 +1,6 @@ +([]string) (len=4) { + (string) (len=10) "AquaBroker", + (string) (len=11) "StormBroker", + (string) (len=12) "SoundsBroker", + (string) (len=12) "VisualBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-1-stormBroker-KnownEventListeners b/broker/.snapshots/broker-glob--func1-1-1-stormBroker-KnownEventListeners new file mode 100644 index 00000000..2ca05b1c --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-1-stormBroker-KnownEventListeners @@ -0,0 +1,16 @@ +([]string) (len=14) { + (string) (len=24) "AquaBroker.vj.music.tone", + (string) (len=25) "AquaBroker.vj.music.verse", + (string) (len=25) "StormBroker.dj.music.tone", + (string) (len=26) "AquaBroker.vj.music.chorus", + (string) (len=26) "SoundsBroker.dj.music.tone", + (string) (len=26) "StormBroker.dj.music.verse", + (string) (len=26) "VisualBroker.vj.music.tone", + (string) (len=27) "SoundsBroker.dj.music.verse", + (string) (len=27) "StormBroker.dj.music.chorus", + (string) (len=27) "VisualBroker.vj.music.verse", + (string) (len=28) "SoundsBroker.dj.music.chorus", + (string) (len=28) "VisualBroker.vj.music.chorus", + (string) (len=30) "SoundsBroker.music.music.verse", + (string) (len=31) "SoundsBroker.music.music.chorus" +} diff --git a/broker/.snapshots/broker-glob--func1-1-1-stormBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-1-stormBroker-KnownNodes new file mode 100644 index 00000000..1ccf36bb --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-1-stormBroker-KnownNodes @@ -0,0 +1,6 @@ +([]string) (len=4) { + (string) (len=10) "AquaBroker", + (string) (len=11) "StormBroker", + (string) (len=12) "SoundsBroker", + (string) (len=12) "VisualBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-1-stormBroker-stopped-aquaBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-1-stormBroker-stopped-aquaBroker-KnownNodes new file mode 100644 index 00000000..1ccf36bb --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-1-stormBroker-stopped-aquaBroker-KnownNodes @@ -0,0 +1,6 @@ +([]string) (len=4) { + (string) (len=10) "AquaBroker", + (string) (len=11) "StormBroker", + (string) (len=12) "SoundsBroker", + (string) (len=12) "VisualBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-1-stormBroker-stopped-soundsBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-1-stormBroker-stopped-soundsBroker-KnownNodes new file mode 100644 index 00000000..1ccf36bb --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-1-stormBroker-stopped-soundsBroker-KnownNodes @@ -0,0 +1,6 @@ +([]string) (len=4) { + (string) (len=10) "AquaBroker", + (string) (len=11) "StormBroker", + (string) (len=12) "SoundsBroker", + (string) (len=12) "VisualBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-1-stormBroker-stopped-visualBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-1-stormBroker-stopped-visualBroker-KnownNodes new file mode 100644 index 00000000..1ccf36bb --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-1-stormBroker-stopped-visualBroker-KnownNodes @@ -0,0 +1,6 @@ +([]string) (len=4) { + (string) (len=10) "AquaBroker", + (string) (len=11) "StormBroker", + (string) (len=12) "SoundsBroker", + (string) (len=12) "VisualBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-1-visualBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-1-visualBroker-KnownNodes new file mode 100644 index 00000000..efb95d0d --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-1-visualBroker-KnownNodes @@ -0,0 +1,4 @@ +([]string) (len=2) { + (string) (len=12) "SoundsBroker", + (string) (len=12) "VisualBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-aquaBroker-KnownEventListeners b/broker/.snapshots/broker-glob--func1-1-aquaBroker-KnownEventListeners new file mode 100644 index 00000000..edd5628c --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-aquaBroker-KnownEventListeners @@ -0,0 +1,13 @@ +([]string) (len=11) { + (string) (len=24) "AquaBroker.vj.music.tone", + (string) (len=25) "AquaBroker.vj.music.verse", + (string) (len=26) "AquaBroker.vj.music.chorus", + (string) (len=26) "SoundsBroker.dj.music.tone", + (string) (len=26) "VisualBroker.vj.music.tone", + (string) (len=27) "SoundsBroker.dj.music.verse", + (string) (len=27) "VisualBroker.vj.music.verse", + (string) (len=28) "SoundsBroker.dj.music.chorus", + (string) (len=28) "VisualBroker.vj.music.chorus", + (string) (len=30) "SoundsBroker.music.music.verse", + (string) (len=31) "SoundsBroker.music.music.chorus" +} diff --git a/broker/.snapshots/broker-glob--func1-1-aquaBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-aquaBroker-KnownNodes new file mode 100644 index 00000000..8861e740 --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-aquaBroker-KnownNodes @@ -0,0 +1,5 @@ +([]string) (len=3) { + (string) (len=10) "AquaBroker", + (string) (len=12) "SoundsBroker", + (string) (len=12) "VisualBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-soundsBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-soundsBroker-KnownNodes new file mode 100644 index 00000000..6fce9f48 --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-soundsBroker-KnownNodes @@ -0,0 +1,3 @@ +([]string) (len=1) { + (string) (len=12) "SoundsBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-soundsBroker-Stopped-aquaBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-soundsBroker-Stopped-aquaBroker-KnownNodes new file mode 100644 index 00000000..1ccf36bb --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-soundsBroker-Stopped-aquaBroker-KnownNodes @@ -0,0 +1,6 @@ +([]string) (len=4) { + (string) (len=10) "AquaBroker", + (string) (len=11) "StormBroker", + (string) (len=12) "SoundsBroker", + (string) (len=12) "VisualBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-soundsBroker-Stopped-visualBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-soundsBroker-Stopped-visualBroker-KnownNodes new file mode 100644 index 00000000..1ccf36bb --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-soundsBroker-Stopped-visualBroker-KnownNodes @@ -0,0 +1,6 @@ +([]string) (len=4) { + (string) (len=10) "AquaBroker", + (string) (len=11) "StormBroker", + (string) (len=12) "SoundsBroker", + (string) (len=12) "VisualBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-stormBroker-KnownEventListeners b/broker/.snapshots/broker-glob--func1-1-stormBroker-KnownEventListeners new file mode 100644 index 00000000..2ca05b1c --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-stormBroker-KnownEventListeners @@ -0,0 +1,16 @@ +([]string) (len=14) { + (string) (len=24) "AquaBroker.vj.music.tone", + (string) (len=25) "AquaBroker.vj.music.verse", + (string) (len=25) "StormBroker.dj.music.tone", + (string) (len=26) "AquaBroker.vj.music.chorus", + (string) (len=26) "SoundsBroker.dj.music.tone", + (string) (len=26) "StormBroker.dj.music.verse", + (string) (len=26) "VisualBroker.vj.music.tone", + (string) (len=27) "SoundsBroker.dj.music.verse", + (string) (len=27) "StormBroker.dj.music.chorus", + (string) (len=27) "VisualBroker.vj.music.verse", + (string) (len=28) "SoundsBroker.dj.music.chorus", + (string) (len=28) "VisualBroker.vj.music.chorus", + (string) (len=30) "SoundsBroker.music.music.verse", + (string) (len=31) "SoundsBroker.music.music.chorus" +} diff --git a/broker/.snapshots/broker-glob--func1-1-stormBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-stormBroker-KnownNodes new file mode 100644 index 00000000..1ccf36bb --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-stormBroker-KnownNodes @@ -0,0 +1,6 @@ +([]string) (len=4) { + (string) (len=10) "AquaBroker", + (string) (len=11) "StormBroker", + (string) (len=12) "SoundsBroker", + (string) (len=12) "VisualBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-stormBroker-stopped-aquaBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-stormBroker-stopped-aquaBroker-KnownNodes new file mode 100644 index 00000000..1ccf36bb --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-stormBroker-stopped-aquaBroker-KnownNodes @@ -0,0 +1,6 @@ +([]string) (len=4) { + (string) (len=10) "AquaBroker", + (string) (len=11) "StormBroker", + (string) (len=12) "SoundsBroker", + (string) (len=12) "VisualBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-stormBroker-stopped-soundsBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-stormBroker-stopped-soundsBroker-KnownNodes new file mode 100644 index 00000000..1ccf36bb --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-stormBroker-stopped-soundsBroker-KnownNodes @@ -0,0 +1,6 @@ +([]string) (len=4) { + (string) (len=10) "AquaBroker", + (string) (len=11) "StormBroker", + (string) (len=12) "SoundsBroker", + (string) (len=12) "VisualBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-stormBroker-stopped-visualBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-stormBroker-stopped-visualBroker-KnownNodes new file mode 100644 index 00000000..1ccf36bb --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-stormBroker-stopped-visualBroker-KnownNodes @@ -0,0 +1,6 @@ +([]string) (len=4) { + (string) (len=10) "AquaBroker", + (string) (len=11) "StormBroker", + (string) (len=12) "SoundsBroker", + (string) (len=12) "VisualBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-1-visualBroker-KnownNodes b/broker/.snapshots/broker-glob--func1-1-visualBroker-KnownNodes new file mode 100644 index 00000000..efb95d0d --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-1-visualBroker-KnownNodes @@ -0,0 +1,4 @@ +([]string) (len=2) { + (string) (len=12) "SoundsBroker", + (string) (len=12) "VisualBroker" +} diff --git a/broker/.snapshots/broker-glob--func1-2-1-bkr1-results b/broker/.snapshots/broker-glob--func1-2-1-bkr1-results new file mode 100644 index 00000000..344c5199 --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-2-1-bkr1-results @@ -0,0 +1,6 @@ +(map[string]moleculer.Payload) (len=4) { + (string) (len=10) "food-lunch": (serializer.JSONPayload) input: (lunch param ) -> output: ( lunch result ), + (string) (len=11) "food-dinner": (serializer.JSONPayload) input: (dinner param ) -> output: ( dinner result ), + (string) (len=11) "music-start": (*payload.RawPayload)(input: (start param ) -> output: ( start result )), + (string) (len=9) "music-end": (*payload.RawPayload)(input: (end param ) -> output: ( end result )) +} diff --git a/broker/.snapshots/broker-glob--func1-2-1-bkr2-results b/broker/.snapshots/broker-glob--func1-2-1-bkr2-results new file mode 100644 index 00000000..159b6d79 --- /dev/null +++ b/broker/.snapshots/broker-glob--func1-2-1-bkr2-results @@ -0,0 +1,6 @@ +(map[string]moleculer.Payload) (len=4) { + (string) (len=10) "food-lunch": (*payload.RawPayload)(input: (lunch param ) -> output: ( lunch result )), + (string) (len=11) "food-dinner": (*payload.RawPayload)(input: (dinner param ) -> output: ( dinner result )), + (string) (len=11) "music-start": (serializer.JSONPayload) input: (start param ) -> output: ( start result ), + (string) (len=9) "music-end": (serializer.JSONPayload) input: (end param ) -> output: ( end result ) +} diff --git a/broker/broker_internals_test.go b/broker/broker_internals_test.go index feac6f69..816ab0c9 100644 --- a/broker/broker_internals_test.go +++ b/broker/broker_internals_test.go @@ -21,10 +21,7 @@ var snap = cupaloy.New(cupaloy.FailOnUpdate(os.Getenv("UPDATE_SNAPSHOTS") == "tr var _ = Describe("Broker Internals", func() { Describe("Broker events", func() { - eventsTestSize := 1 - currentStep := 0 - //TODO needs refactoring.. the test is not realiable and fail from time to time. - Measure("Local and remote events", func(bench Benchmarker) { + It("Should trigger Local and remote events", func() { logLevel := "ERROR" verse := "3 little birds..." chorus := "don't worry..." @@ -38,321 +35,319 @@ var _ = Describe("Broker Internals", func() { } counters := test.Counter() - bench.Time("start broker and send events", func() { - currentStep++ - soundsBroker := New(baseConfig, &moleculer.Config{ - DiscoverNodeID: func() string { return "SoundsBroker" }, - }) - soundsBroker.Publish(moleculer.ServiceSchema{ - Name: "music", - Actions: []moleculer.Action{ - moleculer.Action{ - Name: "start", - Handler: func(ctx moleculer.Context, verse moleculer.Payload) interface{} { - ctx.Logger().Debug(" ** !!! ### music.start ### !!! ** ") - ctx.Emit("music.verse", verse) - counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "music.start") - return nil - }, + soundsBroker := New(baseConfig, &moleculer.Config{ + DiscoverNodeID: func() string { return "SoundsBroker" }, + }) + soundsBroker.Publish(moleculer.ServiceSchema{ + Name: "music", + Actions: []moleculer.Action{ + moleculer.Action{ + Name: "start", + Handler: func(ctx moleculer.Context, verse moleculer.Payload) interface{} { + ctx.Logger().Debug(" ** !!! ### music.start ### !!! ** ") + ctx.Emit("music.verse", verse) + counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "music.start") + return nil }, - moleculer.Action{ - Name: "end", - Handler: func(ctx moleculer.Context, chorus moleculer.Payload) interface{} { - ctx.Emit("music.chorus", chorus) - counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "music.end") - return nil - }, + }, + moleculer.Action{ + Name: "end", + Handler: func(ctx moleculer.Context, chorus moleculer.Payload) interface{} { + ctx.Emit("music.chorus", chorus) + counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "music.end") + return nil }, }, - Events: []moleculer.Event{ - moleculer.Event{ - Name: "music.verse", - Handler: func(ctx moleculer.Context, verse moleculer.Payload) { - ctx.Logger().Debug("music.verse --> ", verse.String()) - ctx.Emit("music.chorus", verse) - counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "music.music.verse") - }, + }, + Events: []moleculer.Event{ + moleculer.Event{ + Name: "music.verse", + Handler: func(ctx moleculer.Context, verse moleculer.Payload) { + ctx.Logger().Debug("music.verse --> ", verse.String()) + ctx.Emit("music.chorus", verse) + counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "music.music.verse") }, - moleculer.Event{ - Name: "music.chorus", - Handler: func(ctx moleculer.Context, chorus moleculer.Payload) { - ctx.Logger().Debug("music.chorus --> ", chorus.String()) - counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "music.music.chorus") - }, + }, + moleculer.Event{ + Name: "music.chorus", + Handler: func(ctx moleculer.Context, chorus moleculer.Payload) { + ctx.Logger().Debug("music.chorus --> ", chorus.String()) + counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "music.music.chorus") }, }, - }) - djService := moleculer.ServiceSchema{ - Name: "dj", - Dependencies: []string{"music"}, - Events: []moleculer.Event{ - moleculer.Event{ - Name: "music.verse", - Handler: func(ctx moleculer.Context, verse moleculer.Payload) { - ctx.Logger().Debug("DJ music.verse --> ", verse.String()) - ctx.Emit("music.chorus", verse) - counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "dj.music.verse") - }, + }, + }) + djService := moleculer.ServiceSchema{ + Name: "dj", + Dependencies: []string{"music"}, + Events: []moleculer.Event{ + moleculer.Event{ + Name: "music.verse", + Handler: func(ctx moleculer.Context, verse moleculer.Payload) { + ctx.Logger().Debug("DJ music.verse --> ", verse.String()) + ctx.Emit("music.chorus", verse) + counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "dj.music.verse") }, - moleculer.Event{ - Name: "music.chorus", - Handler: func(ctx moleculer.Context, chorus moleculer.Payload) { - ctx.Logger().Debug("DJ music.chorus --> ", chorus.String()) - counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "dj.music.chorus") - }, + }, + moleculer.Event{ + Name: "music.chorus", + Handler: func(ctx moleculer.Context, chorus moleculer.Payload) { + ctx.Logger().Debug("DJ music.chorus --> ", chorus.String()) + counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "dj.music.chorus") }, - moleculer.Event{ - Name: "music.tone", - Handler: func(ctx moleculer.Context, ring moleculer.Payload) { - ctx.Logger().Debug("DJ music.tone ring --> ", ring.String()) - counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "dj.music.tone") - }, + }, + moleculer.Event{ + Name: "music.tone", + Handler: func(ctx moleculer.Context, ring moleculer.Payload) { + ctx.Logger().Debug("DJ music.tone ring --> ", ring.String()) + counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "dj.music.tone") }, }, - } - soundsBroker.Publish(djService) - - // soundsBroker.delegates.EmitEvent = func(context moleculer.BrokerContext) { - // entries := soundsBroker.registry.LoadBalanceEvent(context) - // fmt.Println("entries -> ", entries) - // Expect(snap.SnapshotMulti("entries_1-music.verse_2-music.chorus", entries)).Should(Succeed()) - // } - soundsBroker.Start() - Expect(snap.SnapshotMulti("soundsBroker-KnownNodes", soundsBroker.registry.KnownNodes())).Should(Succeed()) - - //Scenario: action music.start will emit music.verse wich emits music.chorus - becuase there are 2 listeners for music.serve - //there should be too emits to music.chorus - <-soundsBroker.Call("music.start", verse) - - Expect(counters.Check("music.music.verse", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("dj.music.verse", 1)).ShouldNot(HaveOccurred()) - - Expect(counters.Check("music.music.chorus", 2)).ShouldNot(HaveOccurred()) //failed here - Expect(counters.Check("dj.music.chorus", 2)).ShouldNot(HaveOccurred()) - - //Scenario: music.end will emit music.chorus once. - <-soundsBroker.Call("music.end", chorus) - - Expect(counters.Check("music.music.verse", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("dj.music.verse", 1)).ShouldNot(HaveOccurred()) - - Expect(counters.Check("music.music.chorus", 3)).ShouldNot(HaveOccurred()) - Expect(counters.Check("dj.music.chorus", 3)).ShouldNot(HaveOccurred()) - - visualBroker := New(baseConfig, &moleculer.Config{ - DiscoverNodeID: func() string { return "VisualBroker" }, - }) - vjService := moleculer.ServiceSchema{ - Name: "vj", - Dependencies: []string{"music", "dj"}, - Events: []moleculer.Event{ - moleculer.Event{ - Name: "music.verse", - Handler: func(ctx moleculer.Context, verse moleculer.Payload) { - ctx.Logger().Debug("VJ music.verse --> ", verse.String()) - counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "vj.music.verse") - }, + }, + } + soundsBroker.Publish(djService) + + // soundsBroker.delegates.EmitEvent = func(context moleculer.BrokerContext) { + // entries := soundsBroker.registry.LoadBalanceEvent(context) + // fmt.Println("entries -> ", entries) + // Expect(snap.SnapshotMulti("entries_1-music.verse_2-music.chorus", entries)).Should(Succeed()) + // } + soundsBroker.Start() + Expect(snap.SnapshotMulti("soundsBroker-KnownNodes", soundsBroker.registry.KnownNodes())).Should(Succeed()) + + //Scenario: action music.start will emit music.verse wich emits music.chorus - becuase there are 2 listeners for music.serve + //there should be too emits to music.chorus + <-soundsBroker.Call("music.start", verse) + + Expect(counters.Check("music.music.verse", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.verse", 1)).ShouldNot(HaveOccurred()) + + Expect(counters.Check("music.music.chorus", 2)).ShouldNot(HaveOccurred()) //failed here + Expect(counters.Check("dj.music.chorus", 2)).ShouldNot(HaveOccurred()) + + //Scenario: music.end will emit music.chorus once. + <-soundsBroker.Call("music.end", chorus) + + Expect(counters.Check("music.music.verse", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.verse", 1)).ShouldNot(HaveOccurred()) + + Expect(counters.Check("music.music.chorus", 3)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.chorus", 3)).ShouldNot(HaveOccurred()) + + visualBroker := New(baseConfig, &moleculer.Config{ + DiscoverNodeID: func() string { return "VisualBroker" }, + }) + vjService := moleculer.ServiceSchema{ + Name: "vj", + Dependencies: []string{"music", "dj"}, + Events: []moleculer.Event{ + moleculer.Event{ + Name: "music.verse", + Handler: func(ctx moleculer.Context, verse moleculer.Payload) { + ctx.Logger().Debug("VJ music.verse --> ", verse.String()) + counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "vj.music.verse") }, - moleculer.Event{ - Name: "music.chorus", - Handler: func(ctx moleculer.Context, chorus moleculer.Payload) { - ctx.Logger().Debug("VJ music.chorus --> ", chorus.String()) - counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "vj.music.chorus") - }, + }, + moleculer.Event{ + Name: "music.chorus", + Handler: func(ctx moleculer.Context, chorus moleculer.Payload) { + ctx.Logger().Debug("VJ music.chorus --> ", chorus.String()) + counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "vj.music.chorus") }, - moleculer.Event{ - Name: "music.tone", - Handler: func(ctx moleculer.Context, ring moleculer.Payload) { - ctx.Logger().Debug("VJ music.tone ring --> ", ring.String()) - counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "vj.music.tone") - }, + }, + moleculer.Event{ + Name: "music.tone", + Handler: func(ctx moleculer.Context, ring moleculer.Payload) { + ctx.Logger().Debug("VJ music.tone ring --> ", ring.String()) + counters.Inc(ctx.(*context.Context).BrokerDelegates().LocalNode().GetID(), "vj.music.tone") }, }, + }, + } + visualBroker.Publish(vjService) + visualBroker.Publish(moleculer.ServiceSchema{ + Name: "visualBrokerService", + Dependencies: []string{"music"}, + }) + visualBroker.Start() + visualBroker.WaitForNodes("SoundsBroker") + Expect(snap.SnapshotMulti("visualBroker-KnownNodes", visualBroker.registry.KnownNodes())).Should(Succeed()) + + counters.Clear() + time.Sleep(time.Millisecond) + //Scenario: same action music.start as before, but now we added a new broker and new service. + visualBroker.Call("music.start", verse) + + Expect(counters.Check("music.start", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("music.music.verse", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.verse", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("vj.music.verse", 1)).ShouldNot(HaveOccurred()) + + Expect(counters.Check("music.music.chorus", 2)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.chorus", 2)).ShouldNot(HaveOccurred()) + Expect(counters.Check("vj.music.chorus", 2)).ShouldNot(HaveOccurred()) + + <-visualBroker.Call("music.end", chorus) + + Expect(counters.Check("music.end", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("music.music.verse", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.verse", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("vj.music.verse", 1)).ShouldNot(HaveOccurred()) + + Expect(counters.Check("music.music.chorus", 3)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.chorus", 3)).ShouldNot(HaveOccurred()) + Expect(counters.Check("vj.music.chorus", 3)).ShouldNot(HaveOccurred()) + + //add a second instance of the vj service, but only one should receive emit events. + aquaBroker := New(baseConfig, &moleculer.Config{ + DiscoverNodeID: func() string { return "AquaBroker" }, + }) + aquaBroker.Publish(vjService) + aquaBroker.Publish(moleculer.ServiceSchema{ + Name: "aquaBrokerService", + Dependencies: []string{"music", "dj"}, + }) + aquaBroker.Start() + for { + if len(aquaBroker.registry.KnownNodes()) == 3 { + break } - visualBroker.Publish(vjService) - visualBroker.Publish(moleculer.ServiceSchema{ - Name: "visualBrokerService", - Dependencies: []string{"music"}, - }) - visualBroker.Start() - visualBroker.WaitForNodes("SoundsBroker") - Expect(snap.SnapshotMulti("visualBroker-KnownNodes", visualBroker.registry.KnownNodes())).Should(Succeed()) - - counters.Clear() - time.Sleep(time.Millisecond) - //Scenario: same action music.start as before, but now we added a new broker and new service. - visualBroker.Call("music.start", verse) - - Expect(counters.Check("music.start", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("music.music.verse", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("dj.music.verse", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("vj.music.verse", 1)).ShouldNot(HaveOccurred()) - - Expect(counters.Check("music.music.chorus", 2)).ShouldNot(HaveOccurred()) - Expect(counters.Check("dj.music.chorus", 2)).ShouldNot(HaveOccurred()) - Expect(counters.Check("vj.music.chorus", 2)).ShouldNot(HaveOccurred()) - - <-visualBroker.Call("music.end", chorus) - - Expect(counters.Check("music.end", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("music.music.verse", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("dj.music.verse", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("vj.music.verse", 1)).ShouldNot(HaveOccurred()) - - Expect(counters.Check("music.music.chorus", 3)).ShouldNot(HaveOccurred()) - Expect(counters.Check("dj.music.chorus", 3)).ShouldNot(HaveOccurred()) - Expect(counters.Check("vj.music.chorus", 3)).ShouldNot(HaveOccurred()) - - //add a second instance of the vj service, but only one should receive emit events. - aquaBroker := New(baseConfig, &moleculer.Config{ - DiscoverNodeID: func() string { return "AquaBroker" }, - }) - aquaBroker.Publish(vjService) - aquaBroker.Publish(moleculer.ServiceSchema{ - Name: "aquaBrokerService", - Dependencies: []string{"music", "dj"}, - }) - aquaBroker.Start() - for { - if len(aquaBroker.registry.KnownNodes()) == 3 { - break - } - time.Sleep(time.Microsecond) - } - Expect(snap.SnapshotMulti("aquaBroker-KnownNodes", aquaBroker.registry.KnownNodes())).Should(Succeed()) - for { - if len(aquaBroker.registry.KnownEventListeners(true)) == 11 { - break - } - time.Sleep(time.Microsecond) + time.Sleep(time.Microsecond) + } + Expect(snap.SnapshotMulti("aquaBroker-KnownNodes", aquaBroker.registry.KnownNodes())).Should(Succeed()) + for { + if len(aquaBroker.registry.KnownEventListeners(true)) == 11 { + break } - Expect(snap.SnapshotMulti("aquaBroker-KnownEventListeners", aquaBroker.registry.KnownEventListeners(true))).Should(Succeed()) - - counters.Clear() - - aquaBroker.Call("music.start", chorus) - - Expect(counters.Check("music.start", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("music.music.verse", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("dj.music.verse", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("vj.music.verse", 1)).ShouldNot(HaveOccurred()) - - Expect(counters.Check("music.music.chorus", 2)).ShouldNot(HaveOccurred()) - Expect(counters.Check("dj.music.chorus", 2)).ShouldNot(HaveOccurred()) //failed here - Expect(counters.Check("vj.music.chorus", 2)).ShouldNot(HaveOccurred()) - - <-visualBroker.Call("music.end", chorus) - - Expect(counters.Check("music.end", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("music.music.verse", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("dj.music.verse", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("vj.music.verse", 1)).ShouldNot(HaveOccurred()) - - Expect(counters.Check("music.music.chorus", 3)).ShouldNot(HaveOccurred()) - Expect(counters.Check("dj.music.chorus", 3)).ShouldNot(HaveOccurred()) - Expect(counters.Check("vj.music.chorus", 3)).ShouldNot(HaveOccurred()) - - //add a second instance of the dj service - stormBroker := New(baseConfig, &moleculer.Config{ - DiscoverNodeID: func() string { return "StormBroker" }, - }) - stormBroker.Publish(djService) - stormBroker.Start() - for { - if len(stormBroker.registry.KnownNodes()) == 4 { - break - } - time.Sleep(time.Microsecond) + time.Sleep(time.Microsecond) + } + Expect(snap.SnapshotMulti("aquaBroker-KnownEventListeners", aquaBroker.registry.KnownEventListeners(true))).Should(Succeed()) + + counters.Clear() + + aquaBroker.Call("music.start", chorus) + + Expect(counters.Check("music.start", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("music.music.verse", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.verse", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("vj.music.verse", 1)).ShouldNot(HaveOccurred()) + + Expect(counters.Check("music.music.chorus", 2)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.chorus", 2)).ShouldNot(HaveOccurred()) //failed here + Expect(counters.Check("vj.music.chorus", 2)).ShouldNot(HaveOccurred()) + + <-visualBroker.Call("music.end", chorus) + + Expect(counters.Check("music.end", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("music.music.verse", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.verse", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("vj.music.verse", 1)).ShouldNot(HaveOccurred()) + + Expect(counters.Check("music.music.chorus", 3)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.chorus", 3)).ShouldNot(HaveOccurred()) + Expect(counters.Check("vj.music.chorus", 3)).ShouldNot(HaveOccurred()) + + //add a second instance of the dj service + stormBroker := New(baseConfig, &moleculer.Config{ + DiscoverNodeID: func() string { return "StormBroker" }, + }) + stormBroker.Publish(djService) + stormBroker.Start() + for { + if len(stormBroker.registry.KnownNodes()) == 4 { + break } - Expect(snap.SnapshotMulti("stormBroker-KnownNodes", stormBroker.registry.KnownNodes())).Should(Succeed()) - for { - if len(stormBroker.registry.KnownEventListeners(true)) == 14 { - break - } - time.Sleep(time.Microsecond) + time.Sleep(time.Microsecond) + } + Expect(snap.SnapshotMulti("stormBroker-KnownNodes", stormBroker.registry.KnownNodes())).Should(Succeed()) + for { + if len(stormBroker.registry.KnownEventListeners(true)) == 14 { + break } - Expect(snap.SnapshotMulti("stormBroker-KnownEventListeners", stormBroker.registry.KnownEventListeners(true))).Should(Succeed()) + time.Sleep(time.Microsecond) + } + Expect(snap.SnapshotMulti("stormBroker-KnownEventListeners", stormBroker.registry.KnownEventListeners(true))).Should(Succeed()) - counters.Clear() + counters.Clear() - stormBroker.Call("music.start", verse) + stormBroker.Call("music.start", verse) - Expect(counters.Check("music.start", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("music.music.verse", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("dj.music.verse", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("vj.music.verse", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("music.start", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("music.music.verse", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.verse", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("vj.music.verse", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("music.music.chorus", 2)).ShouldNot(HaveOccurred()) - Expect(counters.Check("dj.music.chorus", 2)).ShouldNot(HaveOccurred()) - Expect(counters.Check("vj.music.chorus", 2)).ShouldNot(HaveOccurred()) + Expect(counters.Check("music.music.chorus", 2)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.chorus", 2)).ShouldNot(HaveOccurred()) + Expect(counters.Check("vj.music.chorus", 2)).ShouldNot(HaveOccurred()) - <-stormBroker.Call("music.end", chorus) + <-stormBroker.Call("music.end", chorus) - Expect(counters.Check("music.end", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("music.music.verse", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("dj.music.verse", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("vj.music.verse", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("music.end", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("music.music.verse", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.verse", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("vj.music.verse", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("music.music.chorus", 3)).ShouldNot(HaveOccurred()) - Expect(counters.Check("dj.music.chorus", 3)).ShouldNot(HaveOccurred()) - Expect(counters.Check("vj.music.chorus", 3)).ShouldNot(HaveOccurred()) + Expect(counters.Check("music.music.chorus", 3)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.chorus", 3)).ShouldNot(HaveOccurred()) + Expect(counters.Check("vj.music.chorus", 3)).ShouldNot(HaveOccurred()) - counters.Clear() + counters.Clear() - //now broadcast and every music.tone event listener should receive it. - stormBroker.Broadcast("music.tone", "broad< storm >cast") + //now broadcast and every music.tone event listener should receive it. + stormBroker.Broadcast("music.tone", "broad< storm >cast") - Expect(counters.Check("dj.music.tone", 2)).ShouldNot(HaveOccurred()) - Expect(counters.Check("vj.music.tone", 2)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.tone", 2)).ShouldNot(HaveOccurred()) + Expect(counters.Check("vj.music.tone", 2)).ShouldNot(HaveOccurred()) - counters.Clear() + counters.Clear() - //emit and only 2 shuold be accounted - stormBroker.Emit("music.tone", "Emit< storm >cast") + //emit and only 2 shuold be accounted + stormBroker.Emit("music.tone", "Emit< storm >cast") - Expect(counters.Check("dj.music.tone", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("vj.music.tone", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.tone", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("vj.music.tone", 1)).ShouldNot(HaveOccurred()) - //remove one dj service - stormBroker.Stop() - counters.Clear() + //remove one dj service + stormBroker.Stop() + counters.Clear() - Expect(snap.SnapshotMulti("stormBroker-stopped-aquaBroker-KnownNodes", aquaBroker.registry.KnownNodes())).Should(Succeed()) - Expect(snap.SnapshotMulti("stormBroker-stopped-visualBroker-KnownNodes", visualBroker.registry.KnownNodes())).Should(Succeed()) - Expect(snap.SnapshotMulti("stormBroker-stopped-soundsBroker-KnownNodes", soundsBroker.registry.KnownNodes())).Should(Succeed()) + Expect(snap.SnapshotMulti("stormBroker-stopped-aquaBroker-KnownNodes", aquaBroker.registry.KnownNodes())).Should(Succeed()) + Expect(snap.SnapshotMulti("stormBroker-stopped-visualBroker-KnownNodes", visualBroker.registry.KnownNodes())).Should(Succeed()) + Expect(snap.SnapshotMulti("stormBroker-stopped-soundsBroker-KnownNodes", soundsBroker.registry.KnownNodes())).Should(Succeed()) - aquaBroker.Broadcast("music.tone", "broad< aqua 1 >cast") + aquaBroker.Broadcast("music.tone", "broad< aqua 1 >cast") - Expect(counters.Check("dj.music.tone", 1)).ShouldNot(HaveOccurred()) - Expect(counters.Check("vj.music.tone", 2)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.tone", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("vj.music.tone", 2)).ShouldNot(HaveOccurred()) - //remove the other dj service - soundsBroker.Stop() - counters.Clear() + //remove the other dj service + soundsBroker.Stop() + counters.Clear() - Expect(snap.SnapshotMulti("soundsBroker-Stopped-aquaBroker-KnownNodes", aquaBroker.registry.KnownNodes())).Should(Succeed()) - Expect(snap.SnapshotMulti("soundsBroker-Stopped-visualBroker-KnownNodes", visualBroker.registry.KnownNodes())).Should(Succeed()) + Expect(snap.SnapshotMulti("soundsBroker-Stopped-aquaBroker-KnownNodes", aquaBroker.registry.KnownNodes())).Should(Succeed()) + Expect(snap.SnapshotMulti("soundsBroker-Stopped-visualBroker-KnownNodes", visualBroker.registry.KnownNodes())).Should(Succeed()) - aquaBroker.Broadcast("music.tone", "broad< aqua 2 >cast") + aquaBroker.Broadcast("music.tone", "broad< aqua 2 >cast") - Expect(counters.Check("dj.music.tone", 0)).ShouldNot(HaveOccurred()) - Expect(counters.Check("vj.music.tone", 2)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.tone", 0)).ShouldNot(HaveOccurred()) + Expect(counters.Check("vj.music.tone", 2)).ShouldNot(HaveOccurred()) - counters.Clear() - aquaBroker.Emit("music.tone", "Emit< aqua >cast") + counters.Clear() + aquaBroker.Emit("music.tone", "Emit< aqua >cast") - Expect(counters.Check("dj.music.tone", 0)).ShouldNot(HaveOccurred()) - Expect(counters.Check("vj.music.tone", 1)).ShouldNot(HaveOccurred()) + Expect(counters.Check("dj.music.tone", 0)).ShouldNot(HaveOccurred()) + Expect(counters.Check("vj.music.tone", 1)).ShouldNot(HaveOccurred()) - visualBroker.Stop() - aquaBroker.Stop() - }) - }, eventsTestSize) + visualBroker.Stop() + aquaBroker.Stop() + + }) }) //TODO: MCalls current implementation works ?most of the time" :( ... enought to continue //the dev of other features that need it.. but it need to be refactored so the tests pass everytime.. or maybe the issue is with the testing. - XDescribe("Broker.MCall", func() { + Describe("Broker.MCall", func() { It("MCall on $node service actions with all params false", func() { MCallTimeout := 20 * time.Second diff --git a/broker/broker_suite_internals_test.go b/broker/broker_suite_internals_test.go deleted file mode 100644 index 256e7b58..00000000 --- a/broker/broker_suite_internals_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package broker - -import ( - "testing" - - "github.com/onsi/ginkgo" - "github.com/onsi/gomega" -) - -func TestContext(t *testing.T) { - gomega.RegisterFailHandler(ginkgo.Fail) - ginkgo.RunSpecs(t, "Context Suite") -} diff --git a/go.mod b/go.mod index f6857eb3..e0f34c59 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/go-sql-driver/mysql v1.4.1 // indirect github.com/go-stack/stack v1.8.0 // indirect github.com/gogo/protobuf v1.2.1 // indirect - github.com/google/go-cmp v0.3.0 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect github.com/hashicorp/raft v1.0.1 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect @@ -21,8 +20,8 @@ require ( github.com/nats-io/nats-streaming-server v0.14.1 // indirect github.com/nats-io/nkeys v0.0.2 // indirect github.com/nats-io/nuid v1.0.1 // indirect - github.com/onsi/ginkgo v1.8.0 - github.com/onsi/gomega v1.5.0 + github.com/onsi/ginkgo v1.16.2 + github.com/onsi/gomega v1.10.1 github.com/pkg/errors v0.8.1 github.com/prometheus/procfs v0.0.0-20190503130316-740c07785007 // indirect github.com/sirupsen/logrus v1.4.1 @@ -35,5 +34,7 @@ require ( github.com/tidwall/sjson v1.0.4 go.etcd.io/bbolt v1.3.2 // indirect go.mongodb.org/mongo-driver v1.0.1 + golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6 // indirect + golang.org/x/tools v0.1.0 // indirect google.golang.org/appengine v1.5.0 // indirect ) diff --git a/go.sum b/go.sum index 203fea77..5b19eac5 100644 --- a/go.sum +++ b/go.sum @@ -15,16 +15,28 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= @@ -72,11 +84,20 @@ github.com/nats-io/nkeys v0.0.2 h1:+qM7QpgXnvDDixitZtQUBDY9w/s9mu1ghS+JIbsrx6M= github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.16.2 h1:HFB2fbVIlhIfCfOW81bZFbiC/RvnpXSdhbF2/DJr134= +github.com/onsi/ginkgo v1.16.2/go.mod h1:CObGmKUOKaSC0RjmoAK7tKyn4Azo5P2IWuoMnvwxz1E= github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= @@ -113,6 +134,7 @@ github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/tidwall/gjson v1.2.1 h1:j0efZLrZUvNerEf6xqoi0NjWMK5YlLrR7Guo/dxY174= github.com/tidwall/gjson v1.2.1/go.mod h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA= github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc= @@ -124,32 +146,74 @@ github.com/tidwall/sjson v1.0.4/go.mod h1:bURseu1nuBkFpIES5cz6zBtjmYeOQmEESshn7V github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.mongodb.org/mongo-driver v1.0.1 h1:r2xNB8juGGrZVcIjX2TpY7HUfz+pNYq+GIuC9h6URZg= go.mongodb.org/mongo-driver v1.0.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc h1:a3CU5tJYVj92DY2LaA1kUkrsqD5/3mLDhx2NcNqyW+0= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6 h1:cdsMqa2nXzqlgs183pHxtvoVwU7CyzaCTAUOg94af4c= +golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= +golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.5.0 h1:KxkO13IPW4Lslp2bz+KHP2E3gtFlrIGNThxkZQ3g+4c= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= @@ -159,3 +223,6 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/registry/registry_test.go b/registry/registry_test.go index 38099db3..b13052b6 100644 --- a/registry/registry_test.go +++ b/registry/registry_test.go @@ -148,13 +148,10 @@ func hasNode(list []moleculer.Payload, nodeID string) bool { var _ = Describe("Registry", func() { Describe("Auto discovery", func() { - + //failed with timeout It("3 brokers should auto discovery and perform local and remote Calls", func(done Done) { - mem := &memory.SharedMemory{} - printerBroker := createPrinterBroker(mem) - var serviceAdded, serviceRemoved []moleculer.Payload events := bus.Construct() addedMutex := &sync.Mutex{} @@ -246,9 +243,6 @@ var _ = Describe("Registry", func() { Expect(computeResult.Error()).Should(Succeed()) Expect(computeResult.Value()).Should(Equal(contentToCompute)) - //stopping broker B - scannerBroker.Stop() - step = make(chan bool) onEvent("$registry.service.removed", func(list []moleculer.Payload, cancel func()) { if hasNode(serviceRemoved, "node_scannerBroker") { @@ -256,6 +250,11 @@ var _ = Describe("Registry", func() { step <- true } }) + + //stopping broker B + scannerBroker.Stop() + + //wait services from node node_scannerBroker to be removed <-step Expect(func() { @@ -263,7 +262,7 @@ var _ = Describe("Registry", func() { }).Should(Panic()) //broker B is stopped ... so it should panic close(done) - }, 3) + }, 5) }) Describe("Namespace", func() { diff --git a/service/service_suite_internal_test.go b/service/service_suite_internal_test.go deleted file mode 100644 index b3408585..00000000 --- a/service/service_suite_internal_test.go +++ /dev/null @@ -1,12 +0,0 @@ -package service - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "testing" -) - -func TestInternals(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Service Internal Test Suite") -} From 4d6127b8100aa15cdf8eff35a7a3a938b336ea82 Mon Sep 17 00:00:00 2001 From: Rafael Date: Sun, 16 May 2021 16:41:49 +1200 Subject: [PATCH 11/14] chore fixed broker tests remplaced snapshot there result map changes --- broker/broker_internals_test.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/broker/broker_internals_test.go b/broker/broker_internals_test.go index 816ab0c9..9d6c9d8c 100644 --- a/broker/broker_internals_test.go +++ b/broker/broker_internals_test.go @@ -18,6 +18,11 @@ import ( var snap = cupaloy.New(cupaloy.FailOnUpdate(os.Getenv("UPDATE_SNAPSHOTS") == "true")) +func hasKey(m map[string]moleculer.Payload, k string) bool { + _, foundKey := m[k] + return foundKey +} + var _ = Describe("Broker Internals", func() { Describe("Broker events", func() { @@ -434,10 +439,16 @@ var _ = Describe("Broker Internals", func() { } mcallResults := <-bkr2.MCall(mParams) - Expect(snap.SnapshotMulti("bkr2-results", mcallResults)).Should(Succeed()) + Expect(hasKey(mcallResults, "music-start")).Should(BeTrue()) + Expect(hasKey(mcallResults, "music-end")).Should(BeTrue()) + Expect(hasKey(mcallResults, "food-dinner")).Should(BeTrue()) + Expect(hasKey(mcallResults, "food-lunch")).Should(BeTrue()) mcallResults = <-bkr1.MCall(mParams) - Expect(snap.SnapshotMulti("bkr1-results", mcallResults)).Should(Succeed()) + Expect(hasKey(mcallResults, "music-start")).Should(BeTrue()) + Expect(hasKey(mcallResults, "music-end")).Should(BeTrue()) + Expect(hasKey(mcallResults, "food-dinner")).Should(BeTrue()) + Expect(hasKey(mcallResults, "food-lunch")).Should(BeTrue()) bkr1.Stop() bkr2.Stop() From d53950b515f070022ea4ff0a68c5681f3fb30331 Mon Sep 17 00:00:00 2001 From: Rafael Date: Sun, 16 May 2021 16:47:01 +1200 Subject: [PATCH 12/14] chore fixed resitry tests for stability --- registry/registry_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/registry/registry_test.go b/registry/registry_test.go index b13052b6..afa7ac08 100644 --- a/registry/registry_test.go +++ b/registry/registry_test.go @@ -205,8 +205,6 @@ var _ = Describe("Registry", func() { scanResult := <-printerBroker.Call("scanner.scan", printText) Expect(scanResult.IsError()).Should(BeTrue()) - scannerBroker.Start() - step := make(chan bool) onEvent("$registry.service.added", func(list []moleculer.Payload, cancel func()) { if hasNode(serviceAdded, "node_scannerBroker") { @@ -214,6 +212,7 @@ var _ = Describe("Registry", func() { step <- true } }) + scannerBroker.Start() <-step scanResult = <-scannerBroker.Call("scanner.scan", scanText) @@ -224,8 +223,6 @@ var _ = Describe("Registry", func() { Expect(scanResult.IsError()).ShouldNot(Equal(true)) Expect(scanResult.Value()).Should(Equal(scanText)) - cpuBroker.Start() - serviceAdded = []moleculer.Payload{} step = make(chan bool) onEvent("$registry.service.added", func(list []moleculer.Payload, cancel func()) { @@ -234,7 +231,9 @@ var _ = Describe("Registry", func() { step <- true } }) + cpuBroker.Start() <-step + cpuBroker.WaitForActions("scanner.scan", "printer.print") time.Sleep(time.Millisecond) From af0c531f5e7bf7243aca5814c0d9c49b319a17a7 Mon Sep 17 00:00:00 2001 From: Rafael Date: Sun, 16 May 2021 16:50:28 +1200 Subject: [PATCH 13/14] chore adjust drone build --- .drone.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.drone.yml b/.drone.yml index ede428c4..49794325 100644 --- a/.drone.yml +++ b/.drone.yml @@ -34,6 +34,7 @@ steps: # - apt-get update # - apt-get install -y nodejs # - go test github.com/moleculer-go/compatibility/moleculerjs + - go get github.com/modocache/gover - go run github.com/modocache/gover ./ coverage.txt - curl -s https://codecov.io/bash | bash || echo "Error uploading codecov" - go run github.com/mattn/goveralls -coverprofile=coverage.txt -service=drone.io || echo "Error uploading coveralls" From e3f48e1bca80894dca088bbfacbb5f6bcf36f1a4 Mon Sep 17 00:00:00 2001 From: Rafael Date: Sun, 16 May 2021 16:58:26 +1200 Subject: [PATCH 14/14] chore clean-up tests to latest ginko version --- registry/nodeService_test.go | 6 ++---- registry/registry_test.go | 10 ++++------ transit/nats/nats_test.go | 10 ++++------ 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/registry/nodeService_test.go b/registry/nodeService_test.go index dbfa472e..71b8c475 100644 --- a/registry/nodeService_test.go +++ b/registry/nodeService_test.go @@ -71,9 +71,9 @@ func findBy(field, value string, list []moleculer.Payload) []map[string]interfac var _ = Describe("nodeService", func() { Describe("Local Service $node", func() { - harness := func(action string, scenario string, params map[string]interface{}, transformer func(interface{}) interface{}) func(done Done) { + harness := func(action string, scenario string, params map[string]interface{}, transformer func(interface{}) interface{}) func() { label := fmt.Sprint(scenario, "-", action) - return func(done Done) { + return func() { mem := &memory.SharedMemory{} printerBroker := createPrinterBroker(mem) @@ -101,8 +101,6 @@ var _ = Describe("nodeService", func() { result = <-cpuBroker.Call(action, params) Expect(result.Exists()).Should(BeTrue()) Expect(snap.SnapshotMulti(fmt.Sprint(label, "cpuBroker"), transformer(result))).Should(Succeed()) //failed here perdeu o node priunter - - close(done) } } diff --git a/registry/registry_test.go b/registry/registry_test.go index afa7ac08..36828acb 100644 --- a/registry/registry_test.go +++ b/registry/registry_test.go @@ -149,7 +149,7 @@ var _ = Describe("Registry", func() { Describe("Auto discovery", func() { //failed with timeout - It("3 brokers should auto discovery and perform local and remote Calls", func(done Done) { + It("3 brokers should auto discovery and perform local and remote Calls", func() { mem := &memory.SharedMemory{} printerBroker := createPrinterBroker(mem) var serviceAdded, serviceRemoved []moleculer.Payload @@ -260,13 +260,12 @@ var _ = Describe("Registry", func() { <-scannerBroker.Call("scanner.scan", scanText) }).Should(Panic()) //broker B is stopped ... so it should panic - close(done) - }, 5) + }) }) Describe("Namespace", func() { - It("Services across namespaces cannos see each other", func(done Done) { + It("Services across namespaces cannos see each other", func() { mem := &memory.SharedMemory{} @@ -380,7 +379,6 @@ var _ = Describe("Registry", func() { stageBroker.Stop() stage2Broker.Stop() - close(done) - }, 2) + }) }) }) diff --git a/transit/nats/nats_test.go b/transit/nats/nats_test.go index c31f97c3..cfc6424b 100644 --- a/transit/nats/nats_test.go +++ b/transit/nats/nats_test.go @@ -69,7 +69,7 @@ var _ = Describe("NATS Streaming Transit", func() { }) - It("should make a remote call from profile broker a to user broker", func(done Done) { + It("should make a remote call from profile broker a to user broker", func() { userBroker.Start() profileBroker.Start() profileBroker.WaitFor("user") @@ -79,10 +79,9 @@ var _ = Describe("NATS Streaming Transit", func() { userBroker.Stop() profileBroker.Stop() - close(done) - }, 3) + }) - It("should fail after brokers are stopped", func(done Done) { + It("should fail after brokers are stopped", func() { userBroker.Start() profileBroker.Start() @@ -93,8 +92,7 @@ var _ = Describe("NATS Streaming Transit", func() { Expect((<-profileBroker.Call("user.update", longList)).IsError()).Should(BeTrue()) profileBroker.Stop() - close(done) - }, 3) + }) }) Describe("Start / Stop Cycles.", func() {