Skip to content

Commit

Permalink
Merge pull request #92 from moleculer-go/feat/protocol_v4
Browse files Browse the repository at this point in the history
Feat/protocol v4
  • Loading branch information
pentateu authored Jun 13, 2021
2 parents fcd2071 + 3ae858f commit ea742a7
Show file tree
Hide file tree
Showing 18 changed files with 186 additions and 39 deletions.
13 changes: 13 additions & 0 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"
"time"

"github.com/hashicorp/go-uuid"
bus "github.com/moleculer-go/goemitter"
"github.com/moleculer-go/moleculer"
"github.com/moleculer-go/moleculer/cache"
Expand Down Expand Up @@ -115,6 +116,8 @@ type ServiceBroker struct {

id string

instanceID string

localNode moleculer.Node
}

Expand Down Expand Up @@ -566,6 +569,13 @@ func (broker *ServiceBroker) init() {

broker.config = broker.middlewares.CallHandlers("Config", broker.config).(moleculer.Config)

instanceID, err := uuid.GenerateUUID()
if err != nil {
broker.logger.Error("Could not create an instance id - error ", err)
instanceID = "error creating instance id"
}
broker.instanceID = instanceID

broker.delegates = broker.createDelegates()
broker.registry = registry.CreateRegistry(broker.id, broker.delegates)
broker.localNode = broker.registry.LocalNode()
Expand All @@ -580,6 +590,9 @@ func (broker *ServiceBroker) createDelegates() *moleculer.BrokerDelegates {
Bus: broker.LocalBus,
IsStarted: broker.IsStarted,
Config: broker.config,
InstanceID: func() string {
return broker.instanceID
},
ActionDelegate: func(context moleculer.BrokerContext, opts ...moleculer.Options) chan moleculer.Payload {
return broker.registry.LoadBalanceCall(context, opts...)
},
Expand Down
47 changes: 32 additions & 15 deletions context/contextFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Context struct {
meta moleculer.Payload
timeout int
level int
caller string
}

func BrokerContext(broker *moleculer.BrokerDelegates) moleculer.BrokerContext {
Expand All @@ -46,7 +47,7 @@ func (context *Context) ChildEventContext(eventName string, params moleculer.Pay
parentContext := context
meta := parentContext.meta
if context.broker.Config.Metrics {
meta = meta.Add("metrics", true)
meta = meta.Add("tracing", true)
}
id := util.RandomString(12)
var requestID string
Expand All @@ -55,6 +56,10 @@ func (context *Context) ChildEventContext(eventName string, params moleculer.Pay
} else {
requestID = id
}
caller := parentContext.actionName
if parentContext.eventName != "" {
caller = parentContext.eventName
}
eventContext := Context{
id: id,
requestID: requestID,
Expand All @@ -66,6 +71,7 @@ func (context *Context) ChildEventContext(eventName string, params moleculer.Pay
level: parentContext.level + 1,
meta: meta,
parentID: parentContext.id,
caller: caller,
}
return &eventContext
}
Expand All @@ -80,7 +86,7 @@ func (context *Context) ChildActionContext(actionName string, params moleculer.P
parentContext := context
meta := parentContext.meta
if context.broker.Config.Metrics {
meta = meta.Add("metrics", true)
meta = meta.Add("tracing", true)
}
if len(opts) > 0 && opts[0].Meta != nil && opts[0].Meta.Len() > 0 {
meta = meta.AddMany(opts[0].Meta.RawMap())
Expand All @@ -92,6 +98,10 @@ func (context *Context) ChildActionContext(actionName string, params moleculer.P
} else {
requestID = id
}
caller := parentContext.actionName
if parentContext.eventName != "" {
caller = parentContext.eventName
}
actionContext := Context{
id: id,
requestID: requestID,
Expand All @@ -101,15 +111,11 @@ func (context *Context) ChildActionContext(actionName string, params moleculer.P
level: parentContext.level + 1,
meta: meta,
parentID: parentContext.id,
caller: caller,
}
return &actionContext
}

// Max calling level check to avoid calling loops
func checkMaxCalls(context *Context) {

}

// ActionContext create an action context for remote call.
func ActionContext(broker *moleculer.BrokerDelegates, values map[string]interface{}) moleculer.BrokerContext {
var level int
Expand All @@ -130,6 +136,10 @@ func ActionContext(broker *moleculer.BrokerDelegates, values map[string]interfac
parentID = s
}
}
// params := payload.Empty()
// if values["params"] != nil {
// params = payload.New(values["params"])
// }
params := payload.New(values["params"])

if values["timeout"] != nil {
Expand Down Expand Up @@ -210,20 +220,22 @@ func (context *Context) RequestID() string {
func (context *Context) AsMap() map[string]interface{} {
mapResult := make(map[string]interface{})

var metrics bool
if context.meta.Get("metrics").Exists() {
metrics = context.meta.Get("metrics").Bool()
var tracing bool
if context.meta.Get("tracing").Exists() {
tracing = context.meta.Get("tracing").Bool()
}

mapResult["id"] = context.id
mapResult["requestID"] = context.requestID

mapResult["level"] = context.level
mapResult["meta"] = context.meta.RawMap()
mapResult["caller"] = context.caller
mapResult["tracing"] = tracing
mapResult["parentID"] = context.parentID

if context.actionName != "" {
mapResult["action"] = context.actionName
mapResult["metrics"] = metrics
mapResult["parentID"] = context.parentID
mapResult["meta"] = context.meta.RawMap()
mapResult["timeout"] = context.timeout
mapResult["params"] = context.params.Value()
}
Expand All @@ -232,11 +244,12 @@ func (context *Context) AsMap() map[string]interface{} {
mapResult["groups"] = context.groups
mapResult["broadcast"] = context.broadcast
mapResult["data"] = context.params.Value()
mapResult["meta"] = context.meta.RawMap()
mapResult["level"] = context.level
}

//TODO : check how to support streaming params in go
//streaming not supported yet
mapResult["stream"] = false
//mapResult["seq"] = 0 // for stream payloads

return mapResult
}
Expand Down Expand Up @@ -323,3 +336,7 @@ func (context *Context) Logger() *log.Entry {
}
return context.broker.Logger("context", "<root>")
}

func (context *Context) Caller() string {
return context.caller
}
49 changes: 41 additions & 8 deletions context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var _ = g.Describe("Context", func() {
"meta": map[string]interface{}{},
})
Expect(actionContext).ShouldNot(BeNil())
Expect(len(actionContext.AsMap())).Should(Equal(10))
Expect(len(actionContext.AsMap())).Should(Equal(11))
Expect(actionContext.ActionName()).Should(Equal("action"))
Expect(actionContext.Payload()).Should(Equal(payload.Empty()))
Expect(actionContext.ID()).Should(Equal("id"))
Expand All @@ -47,7 +47,26 @@ var _ = g.Describe("Context", func() {
"parentID": "parentID",
"params": map[string]interface{}{},
})
}).Should(Panic())
}).Should(Panic()) //no action
})

g.It("Should create an action context with no params", func() {
delegates := test.DelegatesWithIdAndConfig("x", moleculer.Config{})
actionContext := ActionContext(delegates, map[string]interface{}{
"sender": "test",
"id": "id",
"action": "action",
"level": 2,
"timeout": 20,
"parentID": "parentID",
"meta": map[string]interface{}{},
})
Expect(actionContext).ShouldNot(BeNil())
Expect(len(actionContext.AsMap())).Should(Equal(11))
Expect(actionContext.ActionName()).Should(Equal("action"))
Expect(actionContext.Payload()).Should(Equal(payload.New(nil)))
Expect(actionContext.ID()).Should(Equal("id"))
Expect(actionContext.Logger()).ShouldNot(BeNil())
})

g.It("Should call SetTargetNodeID", func() {
Expand Down Expand Up @@ -81,7 +100,7 @@ var _ = g.Describe("Context", func() {
})
Expect(eventContext).ShouldNot(BeNil())
Expect(eventContext.IsBroadcast()).Should(BeTrue())
Expect(len(eventContext.AsMap())).Should(Equal(9))
Expect(len(eventContext.AsMap())).Should(Equal(12))
Expect(eventContext.EventName()).Should(Equal("event"))
Expect(eventContext.Groups()).Should(Equal([]string{"a", "b"}))
Expect(eventContext.Payload()).Should(Equal(payload.Empty()))
Expand All @@ -96,32 +115,32 @@ var _ = g.Describe("Context", func() {
}).Should(Panic())
})

g.It("Should create a child context with metrics on", func() {
g.It("Should create a child context with tracing on", func() {

config := moleculer.Config{
Metrics: true,
}
brokerContext := BrokerContext(test.DelegatesWithIdAndConfig("nodex", config))
actionContext := brokerContext.ChildActionContext("actionx", nil)
Expect(actionContext.Meta()).ShouldNot(BeNil())
Expect(actionContext.Meta().Get("metrics").Bool()).Should(BeTrue())
Expect(actionContext.Meta().Get("tracing").Bool()).Should(BeTrue())

eventContext := brokerContext.ChildEventContext("eventx", nil, nil, false)
Expect(eventContext.Meta()).ShouldNot(BeNil())
Expect(eventContext.RequestID()).ShouldNot(Equal(""))
Expect(actionContext.Meta().Get("metrics").Bool()).Should(BeTrue())
Expect(actionContext.Meta().Get("tracing").Bool()).Should(BeTrue())

config = moleculer.Config{
Metrics: false,
}
brokerContext = BrokerContext(test.DelegatesWithIdAndConfig("nodex", config))
actionContext = brokerContext.ChildActionContext("actionx", nil)
Expect(actionContext.Meta()).ShouldNot(BeNil())
Expect(actionContext.Meta().Get("metrics").Exists()).Should(BeFalse())
Expect(actionContext.Meta().Get("tracing").Exists()).Should(BeFalse())

eventContext = brokerContext.ChildEventContext("eventx", nil, nil, false)
Expect(eventContext.Meta()).ShouldNot(BeNil())
Expect(actionContext.Meta().Get("metrics").Exists()).Should(BeFalse())
Expect(actionContext.Meta().Get("tracing").Exists()).Should(BeFalse())

})

Expand Down Expand Up @@ -188,4 +207,18 @@ var _ = g.Describe("Context", func() {
Expect(called).Should(BeTrue())
})

g.It("Should create a child context with proper caller setting", func() {

config := moleculer.Config{
Metrics: true,
}
brokerContext := BrokerContext(test.DelegatesWithIdAndConfig("nodex", config))

actionAContext := brokerContext.ChildActionContext("servicex.action_a", nil)

actionBContext := actionAContext.ChildActionContext("servicex.action_b", nil)

Expect(actionBContext.Caller()).Should(Equal("servicex.action_a"))
})

})
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/gogo/protobuf v1.2.1 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/raft v1.0.1 // indirect
github.com/lib/pq v1.1.1 // indirect
github.com/moleculer-go/cupaloy/v2 v2.5.2
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh
github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI=
github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs=
github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
Expand Down
2 changes: 1 addition & 1 deletion metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func metricsPayload(brokerContext moleculer.BrokerContext) map[string]interface{
func createShouldMetric(Config moleculer.Config) func(context moleculer.BrokerContext) bool {
var callsCount float32 = 0
return func(context moleculer.BrokerContext) bool {
if context.Meta().Get("metrics").Bool() {
if context.Meta().Get("tracing").Bool() {
callsCount++
if callsCount*Config.MetricsRate >= 1.0 {

Expand Down
5 changes: 4 additions & 1 deletion moleculer.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ var DefaultConfig = Config{
RetryPolicy: RetryPolicy{
Enabled: false,
},
RequestTimeout: 1 * time.Minute,
RequestTimeout: 3 * time.Second,
MCallTimeout: 5 * time.Second,
WaitForNeighboursInterval: 200 * time.Millisecond,
}
Expand Down Expand Up @@ -203,6 +203,7 @@ type LoggerFunc func(name string, value string) *log.Entry
type BusFunc func() *bus.Emitter
type isStartedFunc func() bool
type LocalNodeFunc func() Node
type InstanceIDFunc func() string
type ActionDelegateFunc func(context BrokerContext, opts ...Options) chan Payload
type EmitEventFunc func(context BrokerContext)
type ServiceForActionFunc func(string) []*ServiceSchema
Expand Down Expand Up @@ -261,6 +262,7 @@ type BrokerContext interface {
Payload() Payload
Groups() []string
IsBroadcast() bool
Caller() string

//export context info in a map[string]
AsMap() map[string]interface{}
Expand All @@ -280,6 +282,7 @@ type BrokerContext interface {

//Needs Refactoring..2 broker interfaces.. one for regiwstry.. and for for all others.
type BrokerDelegates struct {
InstanceID InstanceIDFunc
LocalNode LocalNodeFunc
Logger LoggerFunc
Bus BusFunc
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(map[string]map[string]interface {}) (len=3) {
(string) (len=13) "nodeCpuBroker": (map[string]interface {}) (len=8) {
(string) (len=13) "nodeCpuBroker": (map[string]interface {}) (len=9) {
(string) (len=2) "id": (string) (len=14) "node_cpuBroker",
(string) (len=3) "cpu": (int64) 0,
(string) (len=3) "seq": (string) (len=7) "removed",
Expand All @@ -13,9 +13,11 @@
(string) (len=13) "100.100.0.100"
},
(string) (len=8) "hostname": (string) (len=7) "removed",
(string) (len=8) "metadata": (map[string]interface {}) {
},
(string) (len=9) "available": (bool) true
},
(string) (len=17) "nodePrinterBroker": (map[string]interface {}) (len=8) {
(string) (len=17) "nodePrinterBroker": (map[string]interface {}) (len=9) {
(string) (len=2) "id": (string) (len=18) "node_printerBroker",
(string) (len=3) "cpu": (int64) 0,
(string) (len=3) "seq": (string) (len=7) "removed",
Expand All @@ -29,9 +31,11 @@
(string) (len=13) "100.100.0.100"
},
(string) (len=8) "hostname": (string) (len=7) "removed",
(string) (len=8) "metadata": (map[string]interface {}) {
},
(string) (len=9) "available": (bool) true
},
(string) (len=17) "nodeScannerBroker": (map[string]interface {}) (len=8) {
(string) (len=17) "nodeScannerBroker": (map[string]interface {}) (len=9) {
(string) (len=2) "id": (string) (len=18) "node_scannerBroker",
(string) (len=3) "cpu": (int64) 0,
(string) (len=3) "seq": (string) (len=7) "removed",
Expand All @@ -45,6 +49,8 @@
(string) (len=13) "100.100.0.100"
},
(string) (len=8) "hostname": (string) (len=7) "removed",
(string) (len=8) "metadata": (map[string]interface {}) {
},
(string) (len=9) "available": (bool) true
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(map[string]map[string]interface {}) (len=3) {
(string) (len=13) "nodeCpuBroker": (map[string]interface {}) <nil>,
(string) (len=17) "nodePrinterBroker": (map[string]interface {}) (len=8) {
(string) (len=17) "nodePrinterBroker": (map[string]interface {}) (len=9) {
(string) (len=2) "id": (string) (len=18) "node_printerBroker",
(string) (len=3) "cpu": (int64) 0,
(string) (len=3) "seq": (string) (len=7) "removed",
Expand All @@ -14,6 +14,8 @@
(string) (len=13) "100.100.0.100"
},
(string) (len=8) "hostname": (string) (len=7) "removed",
(string) (len=8) "metadata": (map[string]interface {}) {
},
(string) (len=9) "available": (bool) true
},
(string) (len=17) "nodeScannerBroker": (map[string]interface {}) <nil>
Expand Down
Loading

0 comments on commit ea742a7

Please sign in to comment.