Skip to content

Commit

Permalink
Merge commit 'ea742a76c707b0da99b57414b8a4d1c14cf8fb7d'
Browse files Browse the repository at this point in the history
  • Loading branch information
pentateu committed Apr 18, 2022
2 parents e410037 + ea742a7 commit b677de3
Show file tree
Hide file tree
Showing 20 changed files with 287 additions and 96 deletions.
2 changes: 2 additions & 0 deletions .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ steps:
- export STAN_HOST="nats-streaming"
- export NATS_HOST="nats-streaming"
- export AMQP_HOST="guest:guest@rabbitmq"
- go get github.com/onsi/ginkgo/[email protected]
- go get github.com/onsi/ginkgo/ginkgo/[email protected]
- go build
- go run github.com/onsi/ginkgo/ginkgo -r --keepGoing --cover --trace -skipPackage=amqp
# - curl -sL https://deb.nodesource.com/setup_12.x | bash -
Expand Down
8 changes: 1 addition & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ Simple, fast, light and fun to develop with. Also easy, very easy to test ;)

- [http://gomicro.services (Official site and Documentation)](http://gomicro.services)
- [Database examples](https://moleculer-go-site.herokuapp.com/docs/0.1/store.html)
- [WhatsApp App](https://github.com/moleculer-go/example-whatsapp)
- [Benchmark](https://github.com/moleculer-go/benchmark)

## Example

Expand Down Expand Up @@ -65,12 +63,8 @@ func main() {

# Roadmap

![Timeline](https://moleculer-go-site.herokuapp.com/images/timeline.png)

## v0.1.0 (MVP)

Development is `complete` - Documentation is `in-progress` and benchmark is also `in-progress`.

**Contents:**

- Service Broker
Expand Down Expand Up @@ -99,7 +93,7 @@ Development is `complete` - Documentation is `in-progress` and benchmark is also
## v0.3.0 (Beta)

- Performance and Optimization
- More DB Adaptors (SQLLite, Firebase, MySQL)
- More DB Adaptors (Firebase, MySQL)
- CLI for Project Seed Generation

## v0.4.0 (Alpha)
Expand Down
13 changes: 13 additions & 0 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"
"time"

"github.com/hashicorp/go-uuid"
bus "github.com/moleculer-go/goemitter"
"github.com/moleculer-go/moleculer"
"github.com/moleculer-go/moleculer/cache"
Expand Down Expand Up @@ -115,6 +116,8 @@ type ServiceBroker struct {

id string

instanceID string

localNode moleculer.Node
}

Expand Down Expand Up @@ -566,6 +569,13 @@ func (broker *ServiceBroker) init() {

broker.config = broker.middlewares.CallHandlers("Config", broker.config).(moleculer.Config)

instanceID, err := uuid.GenerateUUID()
if err != nil {
broker.logger.Error("Could not create an instance id - error ", err)
instanceID = "error creating instance id"
}
broker.instanceID = instanceID

broker.delegates = broker.createDelegates()
broker.registry = registry.CreateRegistry(broker.id, broker.delegates)
broker.localNode = broker.registry.LocalNode()
Expand All @@ -580,6 +590,9 @@ func (broker *ServiceBroker) createDelegates() *moleculer.BrokerDelegates {
Bus: broker.LocalBus,
IsStarted: broker.IsStarted,
Config: broker.config,
InstanceID: func() string {
return broker.instanceID
},
ActionDelegate: func(context moleculer.BrokerContext, opts ...moleculer.Options) chan moleculer.Payload {
return broker.registry.LoadBalanceCall(context, opts...)
},
Expand Down
47 changes: 32 additions & 15 deletions context/contextFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Context struct {
meta moleculer.Payload
timeout int
level int
caller string
}

func BrokerContext(broker *moleculer.BrokerDelegates) moleculer.BrokerContext {
Expand All @@ -46,7 +47,7 @@ func (context *Context) ChildEventContext(eventName string, params moleculer.Pay
parentContext := context
meta := parentContext.meta
if context.broker.Config.Metrics {
meta = meta.Add("metrics", true)
meta = meta.Add("tracing", true)
}
id := util.RandomString(12)
var requestID string
Expand All @@ -55,6 +56,10 @@ func (context *Context) ChildEventContext(eventName string, params moleculer.Pay
} else {
requestID = id
}
caller := parentContext.actionName
if parentContext.eventName != "" {
caller = parentContext.eventName
}
eventContext := Context{
id: id,
requestID: requestID,
Expand All @@ -66,6 +71,7 @@ func (context *Context) ChildEventContext(eventName string, params moleculer.Pay
level: parentContext.level + 1,
meta: meta,
parentID: parentContext.id,
caller: caller,
}
return &eventContext
}
Expand All @@ -80,7 +86,7 @@ func (context *Context) ChildActionContext(actionName string, params moleculer.P
parentContext := context
meta := parentContext.meta
if context.broker.Config.Metrics {
meta = meta.Add("metrics", true)
meta = meta.Add("tracing", true)
}
if len(opts) > 0 && opts[0].Meta != nil && opts[0].Meta.Len() > 0 {
meta = meta.AddMany(opts[0].Meta.RawMap())
Expand All @@ -92,6 +98,10 @@ func (context *Context) ChildActionContext(actionName string, params moleculer.P
} else {
requestID = id
}
caller := parentContext.actionName
if parentContext.eventName != "" {
caller = parentContext.eventName
}
actionContext := Context{
id: id,
requestID: requestID,
Expand All @@ -101,15 +111,11 @@ func (context *Context) ChildActionContext(actionName string, params moleculer.P
level: parentContext.level + 1,
meta: meta,
parentID: parentContext.id,
caller: caller,
}
return &actionContext
}

// Max calling level check to avoid calling loops
func checkMaxCalls(context *Context) {

}

// ActionContext create an action context for remote call.
func ActionContext(broker *moleculer.BrokerDelegates, values map[string]interface{}) moleculer.BrokerContext {
var level int
Expand All @@ -130,6 +136,10 @@ func ActionContext(broker *moleculer.BrokerDelegates, values map[string]interfac
parentID = s
}
}
// params := payload.Empty()
// if values["params"] != nil {
// params = payload.New(values["params"])
// }
params := payload.New(values["params"])

if values["timeout"] != nil {
Expand Down Expand Up @@ -210,20 +220,22 @@ func (context *Context) RequestID() string {
func (context *Context) AsMap() map[string]interface{} {
mapResult := make(map[string]interface{})

var metrics bool
if context.meta.Get("metrics").Exists() {
metrics = context.meta.Get("metrics").Bool()
var tracing bool
if context.meta.Get("tracing").Exists() {
tracing = context.meta.Get("tracing").Bool()
}

mapResult["id"] = context.id
mapResult["requestID"] = context.requestID

mapResult["level"] = context.level
mapResult["meta"] = context.meta.RawMap()
mapResult["caller"] = context.caller
mapResult["tracing"] = tracing
mapResult["parentID"] = context.parentID

if context.actionName != "" {
mapResult["action"] = context.actionName
mapResult["metrics"] = metrics
mapResult["parentID"] = context.parentID
mapResult["meta"] = context.meta.RawMap()
mapResult["timeout"] = context.timeout
mapResult["params"] = context.params.Value()
}
Expand All @@ -232,11 +244,12 @@ func (context *Context) AsMap() map[string]interface{} {
mapResult["groups"] = context.groups
mapResult["broadcast"] = context.broadcast
mapResult["data"] = context.params.Value()
mapResult["meta"] = context.meta.RawMap()
mapResult["level"] = context.level
}

//TODO : check how to support streaming params in go
//streaming not supported yet
mapResult["stream"] = false
//mapResult["seq"] = 0 // for stream payloads

return mapResult
}
Expand Down Expand Up @@ -323,3 +336,7 @@ func (context *Context) Logger() *log.Entry {
}
return context.broker.Logger("context", "<root>")
}

func (context *Context) Caller() string {
return context.caller
}
49 changes: 41 additions & 8 deletions context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var _ = g.Describe("Context", func() {
"meta": map[string]interface{}{},
})
Expect(actionContext).ShouldNot(BeNil())
Expect(len(actionContext.AsMap())).Should(Equal(10))
Expect(len(actionContext.AsMap())).Should(Equal(11))
Expect(actionContext.ActionName()).Should(Equal("action"))
Expect(actionContext.Payload()).Should(Equal(payload.Empty()))
Expect(actionContext.ID()).Should(Equal("id"))
Expand All @@ -47,7 +47,26 @@ var _ = g.Describe("Context", func() {
"parentID": "parentID",
"params": map[string]interface{}{},
})
}).Should(Panic())
}).Should(Panic()) //no action
})

g.It("Should create an action context with no params", func() {
delegates := test.DelegatesWithIdAndConfig("x", moleculer.Config{})
actionContext := ActionContext(delegates, map[string]interface{}{
"sender": "test",
"id": "id",
"action": "action",
"level": 2,
"timeout": 20,
"parentID": "parentID",
"meta": map[string]interface{}{},
})
Expect(actionContext).ShouldNot(BeNil())
Expect(len(actionContext.AsMap())).Should(Equal(11))
Expect(actionContext.ActionName()).Should(Equal("action"))
Expect(actionContext.Payload()).Should(Equal(payload.New(nil)))
Expect(actionContext.ID()).Should(Equal("id"))
Expect(actionContext.Logger()).ShouldNot(BeNil())
})

g.It("Should call SetTargetNodeID", func() {
Expand Down Expand Up @@ -81,7 +100,7 @@ var _ = g.Describe("Context", func() {
})
Expect(eventContext).ShouldNot(BeNil())
Expect(eventContext.IsBroadcast()).Should(BeTrue())
Expect(len(eventContext.AsMap())).Should(Equal(9))
Expect(len(eventContext.AsMap())).Should(Equal(12))
Expect(eventContext.EventName()).Should(Equal("event"))
Expect(eventContext.Groups()).Should(Equal([]string{"a", "b"}))
Expect(eventContext.Payload()).Should(Equal(payload.Empty()))
Expand All @@ -96,32 +115,32 @@ var _ = g.Describe("Context", func() {
}).Should(Panic())
})

g.It("Should create a child context with metrics on", func() {
g.It("Should create a child context with tracing on", func() {

config := moleculer.Config{
Metrics: true,
}
brokerContext := BrokerContext(test.DelegatesWithIdAndConfig("nodex", config))
actionContext := brokerContext.ChildActionContext("actionx", nil)
Expect(actionContext.Meta()).ShouldNot(BeNil())
Expect(actionContext.Meta().Get("metrics").Bool()).Should(BeTrue())
Expect(actionContext.Meta().Get("tracing").Bool()).Should(BeTrue())

eventContext := brokerContext.ChildEventContext("eventx", nil, nil, false)
Expect(eventContext.Meta()).ShouldNot(BeNil())
Expect(eventContext.RequestID()).ShouldNot(Equal(""))
Expect(actionContext.Meta().Get("metrics").Bool()).Should(BeTrue())
Expect(actionContext.Meta().Get("tracing").Bool()).Should(BeTrue())

config = moleculer.Config{
Metrics: false,
}
brokerContext = BrokerContext(test.DelegatesWithIdAndConfig("nodex", config))
actionContext = brokerContext.ChildActionContext("actionx", nil)
Expect(actionContext.Meta()).ShouldNot(BeNil())
Expect(actionContext.Meta().Get("metrics").Exists()).Should(BeFalse())
Expect(actionContext.Meta().Get("tracing").Exists()).Should(BeFalse())

eventContext = brokerContext.ChildEventContext("eventx", nil, nil, false)
Expect(eventContext.Meta()).ShouldNot(BeNil())
Expect(actionContext.Meta().Get("metrics").Exists()).Should(BeFalse())
Expect(actionContext.Meta().Get("tracing").Exists()).Should(BeFalse())

})

Expand Down Expand Up @@ -188,4 +207,18 @@ var _ = g.Describe("Context", func() {
Expect(called).Should(BeTrue())
})

g.It("Should create a child context with proper caller setting", func() {

config := moleculer.Config{
Metrics: true,
}
brokerContext := BrokerContext(test.DelegatesWithIdAndConfig("nodex", config))

actionAContext := brokerContext.ChildActionContext("servicex.action_a", nil)

actionBContext := actionAContext.ChildActionContext("servicex.action_b", nil)

Expect(actionBContext.Caller()).Should(Equal("servicex.action_a"))
})

})
15 changes: 5 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@ module github.com/moleculer-go/moleculer
go 1.12

require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/go-sql-driver/mysql v1.4.1 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/gogo/protobuf v1.2.1 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/raft v1.0.1 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/lib/pq v1.1.1 // indirect
github.com/moleculer-go/cupaloy/v2 v2.5.2
github.com/moleculer-go/goemitter v1.0.3
Expand All @@ -22,19 +19,17 @@ require (
github.com/nats-io/nuid v1.0.1 // indirect
github.com/onsi/ginkgo v1.16.2
github.com/onsi/gomega v1.10.1
github.com/pkg/errors v0.8.1
github.com/pkg/errors v0.9.1
github.com/prometheus/procfs v0.0.0-20190503130316-740c07785007 // indirect
github.com/sirupsen/logrus v1.4.1
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.3.2
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
github.com/tidwall/gjson v1.2.1
github.com/tidwall/match v1.0.1 // indirect
github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 // indirect
github.com/tidwall/pretty v1.1.0 // indirect
github.com/tidwall/sjson v1.0.4
go.etcd.io/bbolt v1.3.2 // indirect
go.mongodb.org/mongo-driver v1.0.1
go.mongodb.org/mongo-driver v1.5.2
golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6 // indirect
golang.org/x/tools v0.1.0 // indirect
google.golang.org/appengine v1.5.0 // indirect
)
Loading

0 comments on commit b677de3

Please sign in to comment.